You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:45 UTC
[04/18] storm git commit: STORM-2702: storm-loadgen
STORM-2702: storm-loadgen
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6c2dcbed
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6c2dcbed
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6c2dcbed
Branch: refs/heads/master
Commit: 6c2dcbedabb88970697f42b6f66bf64177e2ac9c
Parents: 0d10b8a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Aug 21 14:36:10 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Aug 21 14:36:10 2017 -0500
----------------------------------------------------------------------
examples/storm-loadgen/pom.xml | 13 +-
.../org/apache/storm/loadgen/CaptureLoad.java | 341 ++++++++
.../org/apache/storm/loadgen/CompStats.java | 52 ++
.../storm/loadgen/EstimateThroughput.java | 108 +++
.../java/org/apache/storm/loadgen/GenLoad.java | 235 ++++++
.../org/apache/storm/loadgen/GroupingType.java | 91 +++
.../loadgen/HttpForwardingMetricsConsumer.java | 51 +-
.../loadgen/HttpForwardingMetricsServer.java | 79 +-
.../org/apache/storm/loadgen/InputStream.java | 263 +++++++
.../java/org/apache/storm/loadgen/LoadBolt.java | 146 ++++
.../org/apache/storm/loadgen/LoadCompConf.java | 222 ++++++
.../org/apache/storm/loadgen/LoadEngine.java | 45 ++
.../apache/storm/loadgen/LoadMetricsServer.java | 784 +++++++++++++++++++
.../org/apache/storm/loadgen/LoadSpout.java | 137 ++++
.../apache/storm/loadgen/NormalDistStats.java | 151 ++++
.../org/apache/storm/loadgen/OutputStream.java | 121 +++
.../storm/loadgen/OutputStreamEngine.java | 122 +++
.../apache/storm/loadgen/ScopedTopologySet.java | 91 +++
.../storm/loadgen/ThroughputVsLatency.java | 405 ++++------
.../apache/storm/loadgen/TopologyLoadConf.java | 432 ++++++++++
.../apache/storm/loadgen/LoadCompConfTest.java | 57 ++
.../storm/loadgen/LoadMetricsServerTest.java | 36 +
.../storm/loadgen/NormalDistStatsTest.java | 43 +
.../apache/storm/loadgen/OutputStreamTest.java | 37 +
24 files changed, 3724 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/pom.xml b/examples/storm-loadgen/pom.xml
index f75e575..1b2e4f7 100644
--- a/examples/storm-loadgen/pom.xml
+++ b/examples/storm-loadgen/pom.xml
@@ -41,14 +41,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-server</artifactId>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlet</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-client</artifactId>
<version>${project.version}</version>
@@ -60,6 +52,11 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
+ <artifactId>storm-client-misc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
<artifactId>storm-metrics</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
new file mode 100644
index 0000000..e748efa
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.storm.Config;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.BoltStats;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologyPageInfo;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.generated.WorkerSummary;
+import org.apache.storm.utils.NimbusClient;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Capture running topologies for load gen later on.
+ */
+public class CaptureLoad {
+ private static final Logger LOG = LoggerFactory.getLogger(CaptureLoad.class);
+ public static final String DEFAULT_OUT_DIR = "./loadgen/";
+
+ private static List<Double> extractBoltValues(List<ExecutorSummary> summaries,
+ GlobalStreamId id,
+ Function<BoltStats, Map<String, Map<GlobalStreamId, Double>>> func) {
+
+ List<Double> ret = new ArrayList<>();
+ if (summaries != null) {
+ for (ExecutorSummary summ : summaries) {
+ if (summ != null && summ.is_set_stats()) {
+ Map<String, Map<GlobalStreamId, Double>> data = func.apply(summ.get_stats().get_specific().get_bolt());
+ if (data != null) {
+ List<Double> subvalues = data.values().stream()
+ .map((subMap) -> subMap.get(id))
+ .filter((value) -> value != null)
+ .mapToDouble((value) -> value.doubleValue())
+ .boxed().collect(Collectors.toList());
+ ret.addAll(subvalues);
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary topologySummary) throws Exception {
+ String topologyName = topologySummary.get_name();
+ LOG.info("Capturing {}...", topologyName);
+ String topologyId = topologySummary.get_id();
+ TopologyInfo info = client.getTopologyInfo(topologyId);
+ TopologyPageInfo tpinfo = client.getTopologyPageInfo(topologyId, ":all-time", false);
+ @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+ StormTopology topo = client.getUserTopology(topologyId);
+ //Done capturing topology information...
+
+ Map<String, Object> savedTopoConf = new HashMap<>();
+ Map<String, Object> topoConf = (Map<String, Object>) JSONValue.parse(client.getTopologyConf(topologyId));
+ for (String key: TopologyLoadConf.IMPORTANT_CONF_KEYS) {
+ Object o = topoConf.get(key);
+ if (o != null) {
+ savedTopoConf.put(key, o);
+ LOG.info("with config {}: {}", key, o);
+ }
+ }
+ //Lets use the number of actually scheduled workers as a way to bridge RAS and non-RAS
+ int numWorkers = tpinfo.get_num_workers();
+ if (savedTopoConf.containsKey(Config.TOPOLOGY_WORKERS)) {
+ numWorkers = Math.max(numWorkers, ((Number)savedTopoConf.get(Config.TOPOLOGY_WORKERS)).intValue());
+ }
+ savedTopoConf.put(Config.TOPOLOGY_WORKERS, numWorkers);
+
+ Map<String, LoadCompConf.Builder> boltBuilders = new HashMap<>();
+ Map<String, LoadCompConf.Builder> spoutBuilders = new HashMap<>();
+ List<InputStream.Builder> inputStreams = new ArrayList<>();
+ Map<GlobalStreamId, OutputStream.Builder> outStreams = new HashMap<>();
+
+ //Bolts
+ if (topo.get_bolts() != null) {
+ for (Map.Entry<String, Bolt> boltSpec : topo.get_bolts().entrySet()) {
+ String boltComp = boltSpec.getKey();
+ LOG.info("Found bolt {}...", boltComp);
+ Bolt bolt = boltSpec.getValue();
+ ComponentCommon common = bolt.get_common();
+ Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
+ if (inputs != null) {
+ for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
+ GlobalStreamId id = input.getKey();
+ LOG.info("with input {}...", id);
+ Grouping grouping = input.getValue();
+ InputStream.Builder builder = new InputStream.Builder()
+ .withId(id.get_streamId())
+ .withFromComponent(id.get_componentId())
+ .withToComponent(boltComp)
+ .withGroupingType(grouping);
+ inputStreams.add(builder);
+ }
+ }
+ Map<String, StreamInfo> outputs = common.get_streams();
+ if (outputs != null) {
+ for (String name : outputs.keySet()) {
+ GlobalStreamId id = new GlobalStreamId(boltComp, name);
+ LOG.info("and output {}...", id);
+ OutputStream.Builder builder = new OutputStream.Builder()
+ .withId(name);
+ outStreams.put(id, builder);
+ }
+ }
+ LoadCompConf.Builder builder = new LoadCompConf.Builder()
+ .withParallelism(common.get_parallelism_hint())
+ .withId(boltComp);
+ boltBuilders.put(boltComp, builder);
+ }
+ }
+
+ //Spouts
+ if (topo.get_spouts() != null) {
+ for (Map.Entry<String, SpoutSpec> spoutSpec : topo.get_spouts().entrySet()) {
+ String spoutComp = spoutSpec.getKey();
+ LOG.info("Found Spout {}...", spoutComp);
+ SpoutSpec spout = spoutSpec.getValue();
+ ComponentCommon common = spout.get_common();
+
+ Map<String, StreamInfo> outputs = common.get_streams();
+ if (outputs != null) {
+ for (String name : outputs.keySet()) {
+ GlobalStreamId id = new GlobalStreamId(spoutComp, name);
+ LOG.info("with output {}...", id);
+ OutputStream.Builder builder = new OutputStream.Builder()
+ .withId(name);
+ outStreams.put(id, builder);
+ }
+ }
+ LoadCompConf.Builder builder = new LoadCompConf.Builder()
+ .withParallelism(common.get_parallelism_hint())
+ .withId(spoutComp);
+ spoutBuilders.put(spoutComp, builder);
+ }
+ }
+
+ //Stats...
+ Map<String, List<ExecutorSummary>> byComponent = new HashMap<>();
+ for (ExecutorSummary executor: info.get_executors()) {
+ String component = executor.get_component_id();
+ List<ExecutorSummary> list = byComponent.get(component);
+ if (list == null) {
+ list = new ArrayList<>();
+ byComponent.put(component, list);
+ }
+ list.add(executor);
+ }
+
+ List<InputStream> streams = new ArrayList<>(inputStreams.size());
+ //Compute the stats for the different input streams
+ for (InputStream.Builder builder : inputStreams) {
+ GlobalStreamId streamId = new GlobalStreamId(builder.getFromComponent(), builder.getId());
+ List<ExecutorSummary> summaries = byComponent.get(builder.getToComponent());
+ //Execute and process latency...
+ builder.withProcessTime(new NormalDistStats(
+ extractBoltValues(summaries, streamId, BoltStats::get_process_ms_avg)));
+ builder.withExecTime(new NormalDistStats(
+ extractBoltValues(summaries, streamId, BoltStats::get_execute_ms_avg)));
+ //InputStream is done
+ streams.add(builder.build());
+ }
+
+ //There is a bug in some versions that returns 0 for the uptime.
+ // To work around it we should get it an alternative (working) way.
+ Map<String, Integer> workerToUptime = new HashMap<>();
+ for (WorkerSummary ws : tpinfo.get_workers()) {
+ workerToUptime.put(ws.get_supervisor_id() + ":" + ws.get_port(), ws.get_uptime_secs());
+ }
+ LOG.debug("WORKER TO UPTIME {}", workerToUptime);
+
+ for (Map.Entry<GlobalStreamId, OutputStream.Builder> entry : outStreams.entrySet()) {
+ OutputStream.Builder builder = entry.getValue();
+ GlobalStreamId id = entry.getKey();
+ List<Double> emittedRate = new ArrayList<>();
+ List<ExecutorSummary> summaries = byComponent.get(id.get_componentId());
+ if (summaries != null) {
+ for (ExecutorSummary summary: summaries) {
+ if (summary.is_set_stats()) {
+ int uptime = summary.get_uptime_secs();
+ LOG.debug("UPTIME {}", uptime);
+ if (uptime <= 0) {
+ //Likely it is because of a bug, so try to get it another way
+ String key = summary.get_host() + ":" + summary.get_port();
+ uptime = workerToUptime.getOrDefault(key, 1);
+ LOG.debug("Getting uptime for worker {}, {}", key, uptime);
+ }
+ for (Map.Entry<String, Map<String, Long>> statEntry : summary.get_stats().get_emitted().entrySet()) {
+ String timeWindow = statEntry.getKey();
+ long timeSecs = uptime;
+ try {
+ timeSecs = Long.valueOf(timeWindow);
+ } catch (NumberFormatException e) {
+ //Ignored...
+ }
+ timeSecs = Math.min(timeSecs, uptime);
+ Long count = statEntry.getValue().get(id.get_streamId());
+ if (count != null) {
+ LOG.debug("{} emitted {} for {} secs or {} tuples/sec",
+ id, count, timeSecs, count.doubleValue() / timeSecs);
+ emittedRate.add(count.doubleValue() / timeSecs);
+ }
+ }
+ }
+ }
+ }
+ builder.withRate(new NormalDistStats(emittedRate));
+ //TODO to know if the output keys are skewed we have to guess by looking
+ // at the down stream executed stats, but for now we are going to ignore it
+
+ //The OutputStream is done
+ LoadCompConf.Builder comp = boltBuilders.get(id.get_componentId());
+ if (comp == null) {
+ comp = spoutBuilders.get(id.get_componentId());
+ }
+ comp.withStream(builder.build());
+ }
+
+ List<LoadCompConf> spouts = spoutBuilders.values().stream()
+ .map((b) -> b.build())
+ .collect(Collectors.toList());
+
+ List<LoadCompConf> bolts = boltBuilders.values().stream()
+ .map((b) -> b.build())
+ .collect(Collectors.toList());
+
+ return new TopologyLoadConf(topologyName, savedTopoConf, spouts, bolts, streams);
+ }
+
+ /**
+ * Main entry point for CaptureLoad command.
+ * @param args the arguments to the command
+ * @throws Exception on any error
+ */
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption(Option.builder("a")
+ .longOpt("anonymize")
+ .desc("Strip out any possibly identifiable information")
+ .build());
+ options.addOption(Option.builder("o")
+ .longOpt("output-dir")
+ .argName("<file>")
+ .hasArg()
+ .desc("Where to write (defaults to " + DEFAULT_OUT_DIR + ")")
+ .build());
+ options.addOption(Option.builder("h")
+ .longOpt("help")
+ .desc("Print a help message")
+ .build());
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cmd = null;
+ ParseException pe = null;
+ try {
+ cmd = parser.parse(options, args);
+ } catch (ParseException e) {
+ pe = e;
+ }
+ if (pe != null || cmd.hasOption('h')) {
+ if (pe != null) {
+ System.err.println("ERROR " + pe.getMessage());
+ }
+ new HelpFormatter().printHelp("CaptureLoad [options] [topologyName]*", options);
+ return;
+ }
+
+ Config conf = new Config();
+ int exitStatus = -1;
+ String outputDir = DEFAULT_OUT_DIR;
+ if (cmd.hasOption('o')) {
+ outputDir = cmd.getOptionValue('o');
+ }
+ File baseOut = new File(outputDir);
+ LOG.info("Will save captured topologies to {}", baseOut);
+ baseOut.mkdirs();
+
+ try (NimbusClient nc = NimbusClient.getConfiguredClient(conf)) {
+ Nimbus.Iface client = nc.getClient();
+ List<String> topologyNames = cmd.getArgList();
+
+ ClusterSummary clusterSummary = client.getClusterInfo();
+ for (TopologySummary topologySummary: clusterSummary.get_topologies()) {
+ if (topologyNames.isEmpty() || topologyNames.contains(topologySummary.get_name())) {
+ TopologyLoadConf capturedConf = captureTopology(client, topologySummary);
+ if (cmd.hasOption('a')) {
+ capturedConf = capturedConf.anonymize();
+ }
+ capturedConf.writeTo(new File(baseOut, capturedConf.name + ".yaml"));
+ }
+ }
+
+ exitStatus = 0;
+ } catch (Exception e) {
+ LOG.error("Error trying to capture topologies...", e);
+ } finally {
+ System.exit(exitStatus);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java
new file mode 100644
index 0000000..d0e0bd3
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.storm.utils.ObjectReader;
+
+/**
+ * A set of measurements about a component (bolt/spout) so we can statistically reproduce it.
+ */
+public class CompStats implements Serializable {
+ public final double cpuPercent; // Right now we don't have a good way to measure any kind of a distribution, this is all approximate
+ public final double memoryMb; //again no good way to get a distribution...
+
+ /**
+ * Parse out a CompStats from a config map.
+ * @param conf the map holding the CompStats values
+ * @return the parsed CompStats
+ */
+ public static CompStats fromConf(Map<String, Object> conf) {
+ double cpu = ObjectReader.getDouble(conf.get("cpuPercent"), 0.0);
+ double memory = ObjectReader.getDouble(conf.get("memoryMb"), 0.0);
+ return new CompStats(cpu, memory);
+ }
+
+ public void addToConf(Map<String, Object> ret) {
+ ret.put("cpuPercent", cpuPercent);
+ ret.put("memoryMb", memoryMb);
+ }
+
+ public CompStats(double cpuPercent, double memoryMb) {
+ this.cpuPercent = cpuPercent;
+ this.memoryMb = memoryMb;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java
new file mode 100644
index 0000000..80ede37
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.storm.Config;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.loadgen.CaptureLoad;
+import org.apache.storm.utils.NimbusClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Estimate the throughput of all topologies.
+ */
+public class EstimateThroughput {
+ private static final Logger LOG = LoggerFactory.getLogger(EstimateThroughput.class);
+
+ /**
+ * Main entry point for estimate throughput command.
+ * @param args the command line arguments.
+ * @throws Exception on any error.
+ */
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption(Option.builder("h")
+ .longOpt("help")
+ .desc("Print a help message")
+ .build());
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cmd = null;
+ ParseException pe = null;
+ try {
+ cmd = parser.parse(options, args);
+ } catch (ParseException e) {
+ pe = e;
+ }
+ if (pe != null || cmd.hasOption('h')) {
+ if (pe != null) {
+ System.err.println("ERROR " + pe.getMessage());
+ }
+ new HelpFormatter().printHelp("EstimateThroughput [options] [topologyName]*", options);
+ return;
+ }
+
+ Config conf = new Config();
+ int exitStatus = -1;
+
+ List<TopologyLoadConf> regular = new ArrayList<>();
+ List<TopologyLoadConf> trident = new ArrayList<>();
+
+ try (NimbusClient nc = NimbusClient.getConfiguredClient(conf)) {
+ Nimbus.Iface client = nc.getClient();
+ List<String> topologyNames = cmd.getArgList();
+
+ ClusterSummary clusterSummary = client.getClusterInfo();
+ for (TopologySummary topologySummary: clusterSummary.get_topologies()) {
+ if (topologyNames.isEmpty() || topologyNames.contains(topologySummary.get_name())) {
+ TopologyLoadConf capturedConf = CaptureLoad.captureTopology(client, topologySummary);
+ if (capturedConf.looksLikeTrident()) {
+ trident.add(capturedConf);
+ } else {
+ regular.add(capturedConf);
+ }
+ }
+ }
+
+ System.out.println("TOPOLOGY\tTOTAL MESSAGES/sec\tESTIMATED INPUT MESSAGES/sec");
+ for (TopologyLoadConf tl: regular) {
+ System.out.println(tl.name + "\t" + tl.getAllEmittedAggregate() + "\t" + tl.getSpoutEmittedAggregate());
+ }
+ for (TopologyLoadConf tl: trident) {
+ System.out.println(tl.name + "\t" + tl.getAllEmittedAggregate() + "\t" + tl.getTridentEstimatedEmittedAggregate());
+ }
+ exitStatus = 0;
+ } catch (Exception e) {
+ LOG.error("Error trying to capture topologies...", e);
+ } finally {
+ System.exit(exitStatus);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
new file mode 100644
index 0000000..7998fdc
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.metric.LoggingMetricsConsumer;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.NimbusClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generate a simulated load.
+ */
+public class GenLoad {
+ private static final Logger LOG = LoggerFactory.getLogger(GenLoad.class);
+ private static final int TEST_EXECUTE_TIME_DEFAULT = 5;
+
+ /**
+ * Main entry point for GenLoad application.
+ * @param args the command line args.
+ * @throws Exception on any error.
+ */
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption(Option.builder("h")
+ .longOpt("help")
+ .desc("Print a help message")
+ .build());
+ options.addOption(Option.builder("t")
+ .longOpt("test-time")
+ .argName("MINS")
+ .hasArg()
+ .desc("How long to run the tests for in mins (defaults to " + TEST_EXECUTE_TIME_DEFAULT + ")")
+ .build());
+ options.addOption(Option.builder()
+ .longOpt("parallel")
+ .argName("MULTIPLIER")
+ .hasArg()
+ .desc("How much to scale the topology up or down in parallelism.\n"
+ + "The new parallelism will round up to the next whole number\n"
+ + "(defaults to 1.0 no scaling)")
+ .build());
+ options.addOption(Option.builder()
+ .longOpt("throughput")
+ .argName("MULTIPLIER")
+ .hasArg()
+ .desc("How much to scale the topology up or down in throughput.\n"
+ + "Note this is applied after and build on any parallelism changes.\n"
+ + "(defaults to 1.0 no scaling)")
+ .build());
+ options.addOption(Option.builder()
+ .longOpt("local-or-shuffle")
+ .desc("replace shuffle grouping with local or shuffle grouping")
+ .build());
+ options.addOption(Option.builder()
+ .longOpt("debug")
+ .desc("Print debug information about the adjusted topology before submitting it.")
+ .build());
+ LoadMetricsServer.addCommandLineOptions(options);
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cmd = null;
+ Exception commandLineException = null;
+ double executeTime = TEST_EXECUTE_TIME_DEFAULT;
+ double parallel = 1.0;
+ double throughput = 1.0;
+ try {
+ cmd = parser.parse(options, args);
+ if (cmd.hasOption("t")) {
+ executeTime = Double.valueOf(cmd.getOptionValue("t"));
+ }
+ if (cmd.hasOption("parallel")) {
+ parallel = Double.parseDouble(cmd.getOptionValue("parallel"));
+ }
+ if (cmd.hasOption("throughput")) {
+ throughput = Double.parseDouble(cmd.getOptionValue("throughput"));
+ }
+ } catch (ParseException | NumberFormatException e) {
+ commandLineException = e;
+ }
+ if (commandLineException != null || cmd.hasOption('h')) {
+ if (commandLineException != null) {
+ System.err.println("ERROR " + commandLineException.getMessage());
+ }
+ new HelpFormatter().printHelp("GenLoad [options] [captured_file]*", options);
+ return;
+ }
+ Config conf = new Config();
+ LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd);
+
+ metricServer.serve();
+ String url = metricServer.getUrl();
+ int exitStatus = -1;
+ try (NimbusClient client = NimbusClient.getConfiguredClient(conf);
+ ScopedTopologySet topoNames = new ScopedTopologySet(client.getClient())) {
+ for (String topoFile : cmd.getArgList()) {
+ try {
+ TopologyLoadConf tlc = readTopology(topoFile);
+ if (parallel != 1.0) {
+ tlc = tlc.scaleParallel(parallel);
+ }
+ if (throughput != 1.0) {
+ tlc = tlc.scaleThroughput(throughput);
+ }
+ if (cmd.hasOption("local-or-shuffle")) {
+ tlc = tlc.replaceShuffleWithLocalOrShuffle();
+ }
+ if (cmd.hasOption("debug")) {
+ LOG.info("DEBUGGING: {}", tlc.toYamlString());
+ }
+ topoNames.add(parseAndSubmit(tlc, url));
+ } catch (Exception e) {
+ System.err.println("Could Not Submit Topology From " + topoFile);
+ e.printStackTrace(System.err);
+ }
+ }
+
+ metricServer.monitorFor(executeTime, client.getClient(), topoNames);
+ exitStatus = 0;
+ } catch (Exception e) {
+ LOG.error("Error trying to run topologies...", e);
+ } finally {
+ System.exit(exitStatus);
+ }
+ }
+
+ private static TopologyLoadConf readTopology(String topoFile) throws IOException {
+ File f = new File(topoFile);
+
+ TopologyLoadConf tlc = TopologyLoadConf.fromConf(f);
+ if (tlc.name == null) {
+ String fileName = f.getName();
+ int dot = fileName.lastIndexOf('.');
+ final String baseName = fileName.substring(0, dot);
+ tlc = tlc.withName(baseName);
+ }
+ return tlc;
+ }
+
+ static int uniquifier = 0;
+
+ private static String parseAndSubmit(TopologyLoadConf tlc, String url) throws IOException, InvalidTopologyException,
+ AuthorizationException, AlreadyAliveException {
+
+ //First we need some configs
+ Config conf = new Config();
+ if (tlc.topoConf != null) {
+ conf.putAll(tlc.topoConf);
+ }
+ //For some reason on the new code if ackers is null we get 0???
+ Object ackers = conf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
+ Object workers = conf.get(Config.TOPOLOGY_WORKERS);
+ if (ackers == null || ((Number)ackers).intValue() <= 0) {
+ if (workers == null) {
+ workers = 1;
+ }
+ conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, workers);
+ }
+ conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
+ conf.registerMetricsConsumer(HttpForwardingMetricsConsumer.class, url, 1);
+ Map<String, String> workerMetrics = new HashMap<>();
+ if (!NimbusClient.isLocalOverride()) {
+ //sigar uses JNI and does not work in local mode
+ workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
+ }
+ conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
+ conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
+
+ //Lets build a topology.
+ TopologyBuilder builder = new TopologyBuilder();
+ for (LoadCompConf spoutConf : tlc.spouts) {
+ System.out.println("ADDING SPOUT " + spoutConf.id);
+ builder.setSpout(spoutConf.id, new LoadSpout(spoutConf), spoutConf.parallelism);
+ }
+
+ Map<String, BoltDeclarer> boltDeclarers = new HashMap<>();
+ Map<String, LoadBolt> bolts = new HashMap<>();
+ if (tlc.bolts != null) {
+ for (LoadCompConf boltConf : tlc.bolts) {
+ System.out.println("ADDING BOLT " + boltConf.id);
+ LoadBolt lb = new LoadBolt(boltConf);
+ bolts.put(boltConf.id, lb);
+ boltDeclarers.put(boltConf.id, builder.setBolt(boltConf.id, lb, boltConf.parallelism));
+ }
+ }
+
+ if (tlc.streams != null) {
+ for (InputStream in : tlc.streams) {
+ BoltDeclarer declarer = boltDeclarers.get(in.toComponent);
+ if (declarer == null) {
+ throw new IllegalArgumentException("to bolt " + in.toComponent + " does not exist");
+ }
+ LoadBolt lb = bolts.get(in.toComponent);
+ lb.add(in);
+ in.groupingType.assign(declarer, in);
+ }
+ }
+
+ String topoName = tlc.name + "-" + uniquifier++;
+ StormSubmitter.submitTopology(topoName, conf, builder.createTopology());
+ return topoName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GroupingType.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GroupingType.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GroupingType.java
new file mode 100644
index 0000000..a4e0c1a
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GroupingType.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.Locale;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * The different types of groupings that are supported.
+ */
+public enum GroupingType {
+ SHUFFLE {
+ @Override
+ public void assign(BoltDeclarer declarer, InputStream stream) {
+ declarer.shuffleGrouping(stream.fromComponent, stream.id);
+ }
+ },
+ FIELDS {
+ @Override
+ public void assign(BoltDeclarer declarer, InputStream stream) {
+ declarer.fieldsGrouping(stream.fromComponent, stream.id, new Fields("key"));
+ }
+ },
+ ALL {
+ @Override
+ public void assign(BoltDeclarer declarer, InputStream stream) {
+ declarer.allGrouping(stream.fromComponent, stream.id);
+ }
+ },
+ GLOBAL {
+ @Override
+ public void assign(BoltDeclarer declarer, InputStream stream) {
+ declarer.globalGrouping(stream.fromComponent, stream.id);
+ }
+ },
+ LOCAL_OR_SHUFFLE {
+ @Override
+ public void assign(BoltDeclarer declarer, InputStream stream) {
+ declarer.localOrShuffleGrouping(stream.fromComponent, stream.id);
+ }
+ },
+ NONE {
+ @Override
+ public void assign(BoltDeclarer declarer, InputStream stream) {
+ declarer.noneGrouping(stream.fromComponent, stream.id);
+ }
+ },
+ PARTIAL_KEY {
+ @Override
+ public void assign(BoltDeclarer declarer, InputStream stream) {
+ declarer.partialKeyGrouping(stream.fromComponent, stream.id, new Fields("key"));
+ }
+ };
+
+ /**
+ * Parse a String config value and covert it into the enum.
+ * @param conf the string config.
+ * @return the parsed grouping type or SHUFFLE if conf is null.
+ * @throws IllegalArgumentException if parsing does not work.
+ */
+ public static GroupingType fromConf(String conf) {
+ String gt = "SHUFFLE";
+ if (conf != null) {
+ gt = conf.toUpperCase(Locale.ENGLISH);
+ }
+ return GroupingType.valueOf(gt);
+ }
+
+ public String toConf() {
+ return toString();
+ }
+
+ public abstract void assign(BoltDeclarer declarer, InputStream stream);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
index aa4579c..5829e9d 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
@@ -15,49 +15,46 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.misc.metric;
+package org.apache.storm.loadgen;
+
+import com.esotericsoftware.kryo.io.Output;
+import java.net.HttpURLConnection;
+import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
-import java.net.URL;
-import java.net.HttpURLConnection;
-
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.storm.serialization.KryoValuesSerializer;
-
import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.serialization.KryoValuesSerializer;
import org.apache.storm.task.IErrorReporter;
import org.apache.storm.task.TopologyContext;
/**
- * Listens for all metrics and POSTs them serialized to a configured URL
- *
- * To use, add this to your topology's configuration:
+ * Listens for all metrics and POSTs them serialized to a configured URL.
*
+ * <p>To use, add this to your topology's configuration:
* ```java
- * conf.registerMetricsConsumer(org.apache.storm.metrics.HttpForwardingMetricsConsumer.class, "http://example.com:8080/metrics/my-topology/", 1);
+ * conf.registerMetricsConsumer(HttpForwardingMetricsConsumer.class, "http://example.com:8080/metrics/my-topology/", 1);
* ```
*
- * The body of the post is data serialized using {@link org.apache.storm.serialization.KryoValuesSerializer}, with the data passed in
- * as a list of `[TaskInfo, Collection<DataPoint>]`. More things may be appended to the end of the list in the future.
+ * <p>The body of the post is data serialized using {@link org.apache.storm.serialization.KryoValuesSerializer}, with the data passed in
+ * as a list of `[TaskInfo, Collection<DataPoint>]`. More things may be appended to the end of the list in the future.
*
- * The values can be deserialized using the org.apache.storm.serialization.KryoValuesDeserializer, and a
- * correct config + classpath.
+ * <p>The values can be deserialized using the org.apache.storm.serialization.KryoValuesDeserializer, and a correct config + classpath.
*
- * @see org.apache.storm.serialization.KryoValuesSerializer
+ * <p>@see org.apache.storm.serialization.KryoValuesSerializer
*/
public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
- private transient URL _url;
- private transient IErrorReporter _errorReporter;
- private transient KryoValuesSerializer _serializer;
+ private transient URL url;
+ private transient IErrorReporter errorReporter;
+ private transient KryoValuesSerializer serializer;
@Override
public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
try {
- _url = new URL((String)registrationArgument);
- _errorReporter = errorReporter;
- _serializer = new KryoValuesSerializer(topoConf);
+ url = new URL((String)registrationArgument);
+ this.errorReporter = errorReporter;
+ serializer = new KryoValuesSerializer(topoConf);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -66,13 +63,13 @@ public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
@Override
public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
try {
- HttpURLConnection con = (HttpURLConnection)_url.openConnection();
+ HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("POST");
con.setDoOutput(true);
- Output out = new Output(con.getOutputStream());
- _serializer.serializeInto(Arrays.asList(taskInfo, dataPoints), out);
- out.flush();
- out.close();
+ try (Output out = new Output(con.getOutputStream())) {
+ serializer.serializeInto(Arrays.asList(taskInfo, dataPoints), out);
+ out.flush();
+ }
//The connection is not sent unless a response is requested
int response = con.getResponseCode();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
index ef2769a..99a980b 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
@@ -15,27 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.misc.metric;
+package org.apache.storm.loadgen;
+
+import com.esotericsoftware.kryo.io.Input;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
import java.util.Collection;
-import java.util.Map;
import java.util.List;
-import java.net.ServerSocket;
-import java.net.InetAddress;
-
+import java.util.Map;
+import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import javax.servlet.ServletException;
-
-import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
-
-import com.esotericsoftware.kryo.io.Input;
+import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
import org.apache.storm.serialization.KryoValuesDeserializer;
import org.apache.storm.utils.Utils;
-
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
@@ -44,64 +41,72 @@ import org.eclipse.jetty.servlet.ServletHolder;
* A server that can listen for metrics from the HttpForwardingMetricsConsumer.
*/
public abstract class HttpForwardingMetricsServer {
- private Map _conf;
- private Server _server = null;
- private int _port = -1;
- private String _url = null;
+ private Map conf;
+ private Server server = null;
+ private int port = -1;
+ private String url = null;
- ThreadLocal<KryoValuesDeserializer> _des = new ThreadLocal<KryoValuesDeserializer>() {
+ ThreadLocal<KryoValuesDeserializer> des = new ThreadLocal<KryoValuesDeserializer>() {
@Override
protected KryoValuesDeserializer initialValue() {
- return new KryoValuesDeserializer(_conf);
+ return new KryoValuesDeserializer(conf);
}
};
- private class MetricsCollectionServlet extends HttpServlet
- {
- protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
- {
+ private class MetricsCollectionServlet extends HttpServlet {
+ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
Input in = new Input(request.getInputStream());
- List<Object> metrics = _des.get().deserializeFrom(in);
+ List<Object> metrics = des.get().deserializeFrom(in);
handle((TaskInfo)metrics.get(0), (Collection<DataPoint>)metrics.get(1));
response.setStatus(HttpServletResponse.SC_OK);
}
}
+ /**
+ * Constructor.
+ * @param conf the configuration for storm.
+ */
public HttpForwardingMetricsServer(Map<String, Object> conf) {
- _conf = Utils.readStormConfig();
+ this.conf = Utils.readStormConfig();
if (conf != null) {
- _conf.putAll(conf);
+ this.conf.putAll(conf);
}
}
//This needs to be thread safe
public abstract void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
+ /**
+ * Start the server.
+ * @param port the port it shuld listen on, or null/<= 0 to pick a free ephemeral port.
+ */
public void serve(Integer port) {
try {
- if (_server != null) throw new RuntimeException("The server is already running");
+ if (server != null) {
+ throw new RuntimeException("The server is already running");
+ }
if (port == null || port <= 0) {
ServerSocket s = new ServerSocket(0);
port = s.getLocalPort();
s.close();
}
- _server = new Server(port);
- _port = port;
- _url = "http://"+InetAddress.getLocalHost().getHostName()+":"+_port+"/";
+ server = new Server(port);
+ this.port = port;
+ url = "http://" + InetAddress.getLocalHost().getHostName() + ":" + this.port + "/";
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");
- _server.setHandler(context);
+ server.setHandler(context);
context.addServlet(new ServletHolder(new MetricsCollectionServlet()),"/*");
- _server.start();
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ server.start();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
public void serve() {
@@ -109,10 +114,10 @@ public abstract class HttpForwardingMetricsServer {
}
public int getPort() {
- return _port;
+ return port;
}
public String getUrl() {
- return _url;
+ return url;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java
new file mode 100644
index 0000000..19802d9
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.grouping.PartialKeyGrouping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of measurements about a stream so we can statistically reproduce it.
+ */
+public class InputStream implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(InputStream.class);
+ public final String fromComponent;
+ public final String toComponent;
+ public final String id;
+ public final NormalDistStats execTime;
+ public final NormalDistStats processTime;
+ public final GroupingType groupingType;
+ //Cached GlobalStreamId
+ private GlobalStreamId gsid = null;
+
+ /**
+ * Create an output stream from a config.
+ * @param conf the config to read from.
+ * @return the read OutputStream.
+ */
+ public static InputStream fromConf(Map<String, Object> conf) {
+ String component = (String) conf.get("from");
+ String toComp = (String) conf.get("to");
+ NormalDistStats execTime = NormalDistStats.fromConf((Map<String, Object>) conf.get("execTime"));
+ NormalDistStats processTime = NormalDistStats.fromConf((Map<String, Object>) conf.get("processTime"));
+ Map<String, Object> grouping = (Map<String, Object>) conf.get("grouping");
+ GroupingType groupingType = GroupingType.fromConf((String) grouping.get("type"));
+ String streamId = (String) grouping.getOrDefault("streamId", "default");
+ return new InputStream(component, toComp, streamId, execTime, processTime, groupingType);
+ }
+
+ /**
+ * Convert this to a conf.
+ * @return the conf.
+ */
+ public Map<String, Object> toConf() {
+ Map<String, Object> ret = new HashMap<>();
+ ret.put("from", fromComponent);
+ ret.put("to", toComponent);
+ ret.put("execTime", execTime.toConf());
+ ret.put("processTime", processTime.toConf());
+
+ Map<String, Object> grouping = new HashMap<>();
+ grouping.put("streamId", id);
+ grouping.put("type", groupingType.toConf());
+ ret.put("grouping", grouping);
+
+ return ret;
+ }
+
+ public static class Builder {
+ private String fromComponent;
+ private String toComponent;
+ private String id;
+ private NormalDistStats execTime;
+ private NormalDistStats processTime;
+ private GroupingType groupingType = GroupingType.SHUFFLE;
+
+ public String getFromComponent() {
+ return fromComponent;
+ }
+
+ public Builder withFromComponent(String fromComponent) {
+ this.fromComponent = fromComponent;
+ return this;
+ }
+
+ public String getToComponent() {
+ return toComponent;
+ }
+
+ public Builder withToComponent(String toComponent) {
+ this.toComponent = toComponent;
+ return this;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public Builder withId(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public NormalDistStats getExecTime() {
+ return execTime;
+ }
+
+ public Builder withExecTime(NormalDistStats execTime) {
+ this.execTime = execTime;
+ return this;
+ }
+
+ public NormalDistStats getProcessTime() {
+ return processTime;
+ }
+
+ public Builder withProcessTime(NormalDistStats processTime) {
+ this.processTime = processTime;
+ return this;
+ }
+
+ public GroupingType getGroupingType() {
+ return groupingType;
+ }
+
+ public Builder withGroupingType(GroupingType groupingType) {
+ this.groupingType = groupingType;
+ return this;
+ }
+
+ /**
+ * Add the grouping type based off of the thrift Grouping class.
+ * @param grouping the Grouping to extract the grouping type from
+ * @return this
+ */
+ @SuppressWarnings("checkstyle:FallThrough")
+ public Builder withGroupingType(Grouping grouping) {
+ GroupingType group = GroupingType.SHUFFLE;
+ Grouping._Fields thriftType = grouping.getSetField();
+
+ switch (thriftType) {
+ case FIELDS:
+ //Global Grouping is fields with an empty list
+ if (grouping.get_fields().isEmpty()) {
+ group = GroupingType.GLOBAL;
+ } else {
+ group = GroupingType.FIELDS;
+ }
+ break;
+ case ALL:
+ group = GroupingType.ALL;
+ break;
+ case NONE:
+ group = GroupingType.NONE;
+ break;
+ case SHUFFLE:
+ group = GroupingType.SHUFFLE;
+ break;
+ case LOCAL_OR_SHUFFLE:
+ group = GroupingType.LOCAL_OR_SHUFFLE;
+ break;
+ case CUSTOM_SERIALIZED:
+ //This might be a partial key grouping..
+ byte[] data = grouping.get_custom_serialized();
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
+ ObjectInputStream ois = new ObjectInputStream(bis);) {
+ Object cg = ois.readObject();
+ if (cg instanceof PartialKeyGrouping) {
+ group = GroupingType.PARTIAL_KEY;
+ break;
+ }
+ } catch (Exception e) {
+ //ignored
+ }
+ //Fall through if not supported
+ default:
+ LOG.warn("{} is not supported for replay of a topology. Using SHUFFLE", thriftType);
+ break;
+ }
+ return withGroupingType(group);
+ }
+
+ public InputStream build() {
+ return new InputStream(fromComponent, toComponent, id, execTime, processTime, groupingType);
+ }
+ }
+
+ /**
+ * Create a new input stream to a bolt.
+ * @param fromComponent the source component of the stream.
+ * @param id the id of the stream
+ * @param execTime exec time stats
+ * @param processTime process time stats
+ */
+ public InputStream(String fromComponent, String toComponent, String id, NormalDistStats execTime,
+ NormalDistStats processTime, GroupingType groupingType) {
+ this.fromComponent = fromComponent;
+ this.toComponent = toComponent;
+ if (fromComponent == null) {
+ throw new IllegalArgumentException("from cannot be null");
+ }
+ if (toComponent == null) {
+ throw new IllegalArgumentException("to cannot be null");
+ }
+ this.id = id;
+ if (id == null) {
+ throw new IllegalArgumentException("id cannot be null");
+ }
+ this.execTime = execTime;
+ this.processTime = processTime;
+ this.groupingType = groupingType;
+ if (groupingType == null) {
+ throw new IllegalArgumentException("grouping type cannot be null");
+ }
+ }
+
+ /**
+ * Get the global stream id for this input stream.
+ * @return the GlobalStreamId for this input stream.
+ */
+ public synchronized GlobalStreamId gsid() {
+ if (gsid == null) {
+ gsid = new GlobalStreamId(fromComponent, id);
+ }
+ return gsid;
+ }
+
+ /**
+ * Remap the names of components.
+ * @param remappedComponents old name to new name of components.
+ * @param remappedStreams old ID to new ID of streams.
+ * @return a modified version of this with names remapped.
+ */
+ public InputStream remap(Map<String, String> remappedComponents, Map<GlobalStreamId, GlobalStreamId> remappedStreams) {
+ String remapTo = remappedComponents.get(toComponent);
+ String remapFrom = remappedComponents.get(fromComponent);
+ GlobalStreamId remapStreamId = remappedStreams.get(gsid());
+ return new InputStream(remapFrom, remapTo, remapStreamId.get_streamId(), execTime, processTime, groupingType);
+ }
+
+ /**
+ * Replace all SHUFFLE groupings with LOCAL_OR_SHUFFLE.
+ * @return a modified copy of this
+ */
+ public InputStream replaceShuffleWithLocalOrShuffle() {
+ if (groupingType != GroupingType.SHUFFLE) {
+ return this;
+ }
+ return new InputStream(fromComponent, toComponent, id, execTime, processTime, GroupingType.LOCAL_OR_SHUFFLE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
new file mode 100644
index 0000000..e02e8f8
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A bolt that simulates a real world bolt based off of statistics about it.
+ */
+public class LoadBolt extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(LoadBolt.class);
+ private static final long NANO_IN_MS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
+ private final List<OutputStream> outputStreamStats;
+ private List<OutputStreamEngine> outputStreams;
+ private final Map<GlobalStreamId, InputStream> inputStreams = new HashMap<>();
+ private OutputCollector collector;
+ private Random rand;
+ private ScheduledExecutorService timer;
+
+ private static long toNano(double ms) {
+ return (long)(ms * NANO_IN_MS);
+ }
+
+ public LoadBolt(LoadCompConf conf) {
+ this.outputStreamStats = Collections.unmodifiableList(new ArrayList<>(conf.streams));
+ }
+
+ public void add(InputStream inputStream) {
+ GlobalStreamId id = inputStream.gsid();
+ inputStreams.put(id, inputStream);
+ }
+
+ @Override
+ public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+ outputStreams = Collections.unmodifiableList(outputStreamStats.stream()
+ .map((ss) -> new OutputStreamEngine(ss)).collect(Collectors.toList()));
+ this.collector = collector;
+ this.rand = ThreadLocalRandom.current();
+ this.timer = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ private final AtomicLong parkOffset = new AtomicLong(0);
+
+ private void mySleep(long endTime) {
+ //There are some different levels of accuracy here, and we want to deal with all of them
+ long start = System.nanoTime();
+ long newEnd = endTime - parkOffset.get();
+ long diff = newEnd - start;
+ if (diff <= 1_000) {
+ //We are done, nothing that short is going to work here
+ } else if (diff < NANO_IN_MS) {
+ //Busy wait...
+ long sum = 0;
+ while (System.nanoTime() < newEnd) {
+ for (long i = 0; i < 1_000_000; i++) {
+ sum += i;
+ }
+ }
+ } else {
+ //More accurate that thread.sleep, but still not great
+ LockSupport.parkNanos(newEnd - System.nanoTime() - parkOffset.get());
+ // A small control algorithm to adjust the amount of time that we sleep to make it more accurate
+ }
+ parkOffset.addAndGet((System.nanoTime() - endTime) / 2);
+ }
+
+ private void emitTuples(Tuple input) {
+ for (OutputStreamEngine se: outputStreams) {
+ // we may output many tuples for a given input tuple
+ while (se.shouldEmit() != null) {
+ collector.emit(se.streamName, input, new Values(se.nextKey(), "SOME-BOLT-VALUE"));
+ }
+ }
+ }
+
+ @Override
+ public void execute(final Tuple input) {
+ long startTimeNs = System.nanoTime();
+ InputStream in = inputStreams.get(input.getSourceGlobalStreamId());
+ if (in == null) {
+ emitTuples(input);
+ collector.ack(input);
+ } else {
+ long endExecNs = startTimeNs + toNano(in.execTime.nextRandom(rand));
+ long endProcNs = startTimeNs + toNano(in.processTime.nextRandom(rand));
+
+ if ((endProcNs - 1_000_000) < endExecNs) {
+ mySleep(endProcNs);
+ emitTuples(input);
+ collector.ack(input);
+ } else {
+ timer.schedule(() -> {
+ emitTuples(input);
+ collector.ack(input);
+ }, Math.max(0, endProcNs - System.nanoTime()), TimeUnit.NANOSECONDS);
+ }
+
+ mySleep(endExecNs);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ for (OutputStream s: outputStreamStats) {
+ declarer.declareStream(s.id, new Fields("key", "value"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
new file mode 100644
index 0000000..baead0f
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.utils.ObjectReader;
+
+/**
+ * Configuration for a simulated spout.
+ */
+public class LoadCompConf {
+ public final String id;
+ public final int parallelism;
+ public final List<OutputStream> streams;
+ public final CompStats stats;
+
+ /**
+ * Parse the LoadCompConf from a config Map.
+ * @param conf the map holding the config for a LoadCompConf.
+ * @return the parsed object.
+ */
+ public static LoadCompConf fromConf(Map<String, Object> conf) {
+ String id = (String) conf.get("id");
+ int parallelism = ObjectReader.getInt(conf.get("parallelism"), 1);
+ List<OutputStream> streams = new ArrayList<>();
+ List<Map<String, Object>> streamData = (List<Map<String, Object>>) conf.get("streams");
+ if (streamData != null) {
+ for (Map<String, Object> streamInfo: streamData) {
+ streams.add(OutputStream.fromConf(streamInfo));
+ }
+ }
+
+ return new LoadCompConf(id, parallelism, streams, CompStats.fromConf(conf));
+ }
+
+ /**
+ * Build a config map for this object.
+ * @return the config map.
+ */
+ public Map<String, Object> toConf() {
+ Map<String, Object> ret = new HashMap<>();
+ ret.put("id", id);
+ ret.put("parallelism", parallelism);
+
+ if (streams != null) {
+ List<Map<String, Object>> streamData = new ArrayList<>();
+ for (OutputStream out : streams) {
+ streamData.add(out.toConf());
+ }
+ ret.put("streams", streamData);
+ }
+ if (stats != null) {
+ stats.addToConf(ret);
+ }
+ return ret;
+ }
+
+ /**
+ * Chenge the name of components and streams according to the parameters passed in.
+ * @param remappedComponents original component name to new component name.
+ * @param remappedStreams original stream id to new stream id.
+ * @return a copy of this with the values remapped.
+ */
+ public LoadCompConf remap(Map<String, String> remappedComponents, Map<GlobalStreamId, GlobalStreamId> remappedStreams) {
+ String remappedId = remappedComponents.get(id);
+ List<OutputStream> remappedOutStreams = (streams == null) ? null :
+ streams.stream()
+ .map((orig) -> orig.remap(id, remappedStreams))
+ .collect(Collectors.toList());
+
+ return new LoadCompConf(remappedId, parallelism, remappedOutStreams, stats);
+ }
+
+ /**
+ * Scale the parallelism of this component by v. The aggregate throughput will be the same.
+ * The parallelism will be rounded up to the next largest whole number. Parallelism will always be at least 1.
+ * @param v 1.0 is not change 0.5 is drop the parallelism by half.
+ * @return a copy of this with the parallelism adjusted.
+ */
+ public LoadCompConf scaleParallel(double v) {
+ return setParallel(Math.max(1, (int)Math.ceil(parallelism * v)));
+ }
+
+ /**
+ * Set the parallelism of this component, and adjust the throughput so in aggregate it stays the same.
+ * @param newParallelism the new parallelism to set.
+ * @return a copy of this with the adjustments made.
+ */
+ public LoadCompConf setParallel(int newParallelism) {
+ //We need to adjust the throughput accordingly (so that it stays the same in aggregate)
+ double throughputAdjustment = ((double)parallelism) / newParallelism;
+ return new LoadCompConf(id, newParallelism, streams, stats).scaleThroughput(throughputAdjustment);
+ }
+
+ /**
+ * Scale the throughput of this component.
+ * @param v 1.0 is unchanged 0.5 will cut the throughput in half.
+ * @return a copu of this with the adjustments made.
+ */
+ public LoadCompConf scaleThroughput(double v) {
+ if (streams != null) {
+ List<OutputStream> newStreams = streams.stream().map((s) -> s.scaleThroughput(v)).collect(Collectors.toList());
+ return new LoadCompConf(id, parallelism, newStreams, stats);
+ } else {
+ return this;
+ }
+ }
+
+ /**
+ * Compute the total amount of all messages emitted in all streams per second.
+ * @return the sum of all messages emitted per second.
+ */
+ public double getAllEmittedAggregate() {
+ double ret = 0;
+ if (streams != null) {
+ for (OutputStream out: streams) {
+ if (out.rate != null) {
+ ret += out.rate.mean * parallelism;
+ }
+ }
+ }
+ return ret;
+ }
+
+ public static class Builder {
+ private String id;
+ private int parallelism = 1;
+ private List<OutputStream> streams;
+ private CompStats stats;
+
+ public String getId() {
+ return id;
+ }
+
+ public Builder withId(String id) {
+ this.id = id;
+ return this;
+ }
+
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public Builder withParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
+
+ public List<OutputStream> getStreams() {
+ return streams;
+ }
+
+ /**
+ * Add in a single OutputStream to this component.
+ * @param stream the stream to add
+ * @return this
+ */
+ public Builder withStream(OutputStream stream) {
+ if (streams == null) {
+ streams = new ArrayList<>();
+ }
+ streams.add(stream);
+ return this;
+ }
+
+ public Builder withStreams(List<OutputStream> streams) {
+ this.streams = streams;
+ return this;
+ }
+
+ public CompStats getStats() {
+ return stats;
+ }
+
+ public Builder withStats(CompStats stats) {
+ this.stats = stats;
+ return this;
+ }
+
+ public LoadCompConf build() {
+ return new LoadCompConf(id, parallelism, streams, stats);
+ }
+ }
+
+ /**
+ * Create a new LoadCompConf with the given values.
+ * @param id the id of the component.
+ * @param parallelism tha parallelism of the component.
+ * @param streams the output streams of the component.
+ * @param stats the stats of the component.
+ */
+ public LoadCompConf(String id, int parallelism, List<OutputStream> streams, CompStats stats) {
+ this.id = id;
+ if (id == null) {
+ throw new IllegalArgumentException("A spout ID cannot be null");
+ }
+ this.parallelism = parallelism;
+ this.streams = streams;
+ this.stats = stats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java
new file mode 100644
index 0000000..9030379
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.storm.loadgen.CompStats;
+
+/**
+ * The goal of this class is to provide a set of "Tuples" to send that will match as closely as possible the characteristics
+ * measured from a production topology.
+ */
+public class LoadEngine {
+
+ //TODO need to do a lot...
+
+ /**
+ * Provides an API to simulate the timings and CPU utilization of a bolt or spout.
+ */
+ public static class InputTimingEngine {
+ private final Random rand;
+ private final CompStats stats;
+
+ public InputTimingEngine(CompStats stats) {
+ this.stats = stats;
+ rand = ThreadLocalRandom.current();
+ }
+ }
+}