You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:45 UTC

[04/18] storm git commit: STORM-2702: storm-loadgen

STORM-2702: storm-loadgen


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6c2dcbed
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6c2dcbed
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6c2dcbed

Branch: refs/heads/master
Commit: 6c2dcbedabb88970697f42b6f66bf64177e2ac9c
Parents: 0d10b8a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Aug 21 14:36:10 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Aug 21 14:36:10 2017 -0500

----------------------------------------------------------------------
 examples/storm-loadgen/pom.xml                  |  13 +-
 .../org/apache/storm/loadgen/CaptureLoad.java   | 341 ++++++++
 .../org/apache/storm/loadgen/CompStats.java     |  52 ++
 .../storm/loadgen/EstimateThroughput.java       | 108 +++
 .../java/org/apache/storm/loadgen/GenLoad.java  | 235 ++++++
 .../org/apache/storm/loadgen/GroupingType.java  |  91 +++
 .../loadgen/HttpForwardingMetricsConsumer.java  |  51 +-
 .../loadgen/HttpForwardingMetricsServer.java    |  79 +-
 .../org/apache/storm/loadgen/InputStream.java   | 263 +++++++
 .../java/org/apache/storm/loadgen/LoadBolt.java | 146 ++++
 .../org/apache/storm/loadgen/LoadCompConf.java  | 222 ++++++
 .../org/apache/storm/loadgen/LoadEngine.java    |  45 ++
 .../apache/storm/loadgen/LoadMetricsServer.java | 784 +++++++++++++++++++
 .../org/apache/storm/loadgen/LoadSpout.java     | 137 ++++
 .../apache/storm/loadgen/NormalDistStats.java   | 151 ++++
 .../org/apache/storm/loadgen/OutputStream.java  | 121 +++
 .../storm/loadgen/OutputStreamEngine.java       | 122 +++
 .../apache/storm/loadgen/ScopedTopologySet.java |  91 +++
 .../storm/loadgen/ThroughputVsLatency.java      | 405 ++++------
 .../apache/storm/loadgen/TopologyLoadConf.java  | 432 ++++++++++
 .../apache/storm/loadgen/LoadCompConfTest.java  |  57 ++
 .../storm/loadgen/LoadMetricsServerTest.java    |  36 +
 .../storm/loadgen/NormalDistStatsTest.java      |  43 +
 .../apache/storm/loadgen/OutputStreamTest.java  |  37 +
 24 files changed, 3724 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/pom.xml b/examples/storm-loadgen/pom.xml
index f75e575..1b2e4f7 100644
--- a/examples/storm-loadgen/pom.xml
+++ b/examples/storm-loadgen/pom.xml
@@ -41,14 +41,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-server</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-servlet</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.storm</groupId>
       <artifactId>storm-client</artifactId>
       <version>${project.version}</version>
@@ -60,6 +52,11 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
     </dependency>
     <dependency>
       <groupId>org.apache.storm</groupId>
+      <artifactId>storm-client-misc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
       <artifactId>storm-metrics</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
new file mode 100644
index 0000000..e748efa
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.storm.Config;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.BoltStats;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologyPageInfo;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.generated.WorkerSummary;
+import org.apache.storm.utils.NimbusClient;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Capture running topologies for load gen later on.
+ */
+public class CaptureLoad {
+    private static final Logger LOG = LoggerFactory.getLogger(CaptureLoad.class);
+    public static final String DEFAULT_OUT_DIR = "./loadgen/";
+
+    private static List<Double> extractBoltValues(List<ExecutorSummary> summaries,
+                                                  GlobalStreamId id,
+                                                  Function<BoltStats, Map<String, Map<GlobalStreamId, Double>>> func) {
+
+        List<Double> ret = new ArrayList<>();
+        if (summaries != null) {
+            for (ExecutorSummary summ : summaries) {
+                if (summ != null && summ.is_set_stats()) {
+                    Map<String, Map<GlobalStreamId, Double>> data = func.apply(summ.get_stats().get_specific().get_bolt());
+                    if (data != null) {
+                        List<Double> subvalues = data.values().stream()
+                            .map((subMap) -> subMap.get(id))
+                            .filter((value) -> value != null)
+                            .mapToDouble((value) -> value.doubleValue())
+                            .boxed().collect(Collectors.toList());
+                        ret.addAll(subvalues);
+                    }
+                }
+            }
+        }
+        return ret;
+    }
+
+    static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary topologySummary) throws Exception {
+        String topologyName = topologySummary.get_name();
+        LOG.info("Capturing {}...", topologyName);
+        String topologyId = topologySummary.get_id();
+        TopologyInfo info = client.getTopologyInfo(topologyId);
+        TopologyPageInfo tpinfo = client.getTopologyPageInfo(topologyId, ":all-time", false);
+        @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+        StormTopology topo = client.getUserTopology(topologyId);
+        //Done capturing topology information...
+
+        Map<String, Object> savedTopoConf = new HashMap<>();
+        Map<String, Object> topoConf = (Map<String, Object>) JSONValue.parse(client.getTopologyConf(topologyId));
+        for (String key: TopologyLoadConf.IMPORTANT_CONF_KEYS) {
+            Object o = topoConf.get(key);
+            if (o != null) {
+                savedTopoConf.put(key, o);
+                LOG.info("with config {}: {}", key, o);
+            }
+        }
+        //Lets use the number of actually scheduled workers as a way to bridge RAS and non-RAS
+        int numWorkers = tpinfo.get_num_workers();
+        if (savedTopoConf.containsKey(Config.TOPOLOGY_WORKERS)) {
+            numWorkers = Math.max(numWorkers, ((Number)savedTopoConf.get(Config.TOPOLOGY_WORKERS)).intValue());
+        }
+        savedTopoConf.put(Config.TOPOLOGY_WORKERS, numWorkers);
+
+        Map<String, LoadCompConf.Builder> boltBuilders = new HashMap<>();
+        Map<String, LoadCompConf.Builder> spoutBuilders = new HashMap<>();
+        List<InputStream.Builder> inputStreams = new ArrayList<>();
+        Map<GlobalStreamId, OutputStream.Builder> outStreams = new HashMap<>();
+
+        //Bolts
+        if (topo.get_bolts() != null) {
+            for (Map.Entry<String, Bolt> boltSpec : topo.get_bolts().entrySet()) {
+                String boltComp = boltSpec.getKey();
+                LOG.info("Found bolt {}...", boltComp);
+                Bolt bolt = boltSpec.getValue();
+                ComponentCommon common = bolt.get_common();
+                Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
+                if (inputs != null) {
+                    for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
+                        GlobalStreamId id = input.getKey();
+                        LOG.info("with input {}...", id);
+                        Grouping grouping = input.getValue();
+                        InputStream.Builder builder = new InputStream.Builder()
+                            .withId(id.get_streamId())
+                            .withFromComponent(id.get_componentId())
+                            .withToComponent(boltComp)
+                            .withGroupingType(grouping);
+                        inputStreams.add(builder);
+                    }
+                }
+                Map<String, StreamInfo> outputs = common.get_streams();
+                if (outputs != null) {
+                    for (String name : outputs.keySet()) {
+                        GlobalStreamId id = new GlobalStreamId(boltComp, name);
+                        LOG.info("and output {}...", id);
+                        OutputStream.Builder builder = new OutputStream.Builder()
+                            .withId(name);
+                        outStreams.put(id, builder);
+                    }
+                }
+                LoadCompConf.Builder builder = new LoadCompConf.Builder()
+                    .withParallelism(common.get_parallelism_hint())
+                    .withId(boltComp);
+                boltBuilders.put(boltComp, builder);
+            }
+        }
+
+        //Spouts
+        if (topo.get_spouts() != null) {
+            for (Map.Entry<String, SpoutSpec> spoutSpec : topo.get_spouts().entrySet()) {
+                String spoutComp = spoutSpec.getKey();
+                LOG.info("Found Spout {}...", spoutComp);
+                SpoutSpec spout = spoutSpec.getValue();
+                ComponentCommon common = spout.get_common();
+
+                Map<String, StreamInfo> outputs = common.get_streams();
+                if (outputs != null) {
+                    for (String name : outputs.keySet()) {
+                        GlobalStreamId id = new GlobalStreamId(spoutComp, name);
+                        LOG.info("with output {}...", id);
+                        OutputStream.Builder builder = new OutputStream.Builder()
+                            .withId(name);
+                        outStreams.put(id, builder);
+                    }
+                }
+                LoadCompConf.Builder builder = new LoadCompConf.Builder()
+                    .withParallelism(common.get_parallelism_hint())
+                    .withId(spoutComp);
+                spoutBuilders.put(spoutComp, builder);
+            }
+        }
+
+        //Stats...
+        Map<String, List<ExecutorSummary>> byComponent = new HashMap<>();
+        for (ExecutorSummary executor: info.get_executors()) {
+            String component = executor.get_component_id();
+            List<ExecutorSummary> list = byComponent.get(component);
+            if (list == null) {
+                list = new ArrayList<>();
+                byComponent.put(component, list);
+            }
+            list.add(executor);
+        }
+
+        List<InputStream> streams = new ArrayList<>(inputStreams.size());
+        //Compute the stats for the different input streams
+        for (InputStream.Builder builder : inputStreams) {
+            GlobalStreamId streamId = new GlobalStreamId(builder.getFromComponent(), builder.getId());
+            List<ExecutorSummary> summaries = byComponent.get(builder.getToComponent());
+            //Execute and process latency...
+            builder.withProcessTime(new NormalDistStats(
+                extractBoltValues(summaries, streamId, BoltStats::get_process_ms_avg)));
+            builder.withExecTime(new NormalDistStats(
+                extractBoltValues(summaries, streamId, BoltStats::get_execute_ms_avg)));
+            //InputStream is done
+            streams.add(builder.build());
+        }
+
+        //There is a bug in some versions that returns 0 for the uptime.
+        // To work around it we should get it an alternative (working) way.
+        Map<String, Integer> workerToUptime = new HashMap<>();
+        for (WorkerSummary ws : tpinfo.get_workers()) {
+            workerToUptime.put(ws.get_supervisor_id() + ":" + ws.get_port(), ws.get_uptime_secs());
+        }
+        LOG.debug("WORKER TO UPTIME {}", workerToUptime);
+
+        for (Map.Entry<GlobalStreamId, OutputStream.Builder> entry : outStreams.entrySet()) {
+            OutputStream.Builder builder = entry.getValue();
+            GlobalStreamId id = entry.getKey();
+            List<Double> emittedRate = new ArrayList<>();
+            List<ExecutorSummary> summaries = byComponent.get(id.get_componentId());
+            if (summaries != null) {
+                for (ExecutorSummary summary: summaries) {
+                    if (summary.is_set_stats()) {
+                        int uptime = summary.get_uptime_secs();
+                        LOG.debug("UPTIME {}", uptime);
+                        if (uptime <= 0) {
+                            //Likely it is because of a bug, so try to get it another way
+                            String key = summary.get_host() + ":" + summary.get_port();
+                            uptime = workerToUptime.getOrDefault(key, 1);
+                            LOG.debug("Getting uptime for worker {}, {}", key, uptime);
+                        }
+                        for (Map.Entry<String, Map<String, Long>> statEntry : summary.get_stats().get_emitted().entrySet()) {
+                            String timeWindow = statEntry.getKey();
+                            long timeSecs = uptime;
+                            try {
+                                timeSecs = Long.valueOf(timeWindow);
+                            } catch (NumberFormatException e) {
+                                //Ignored...
+                            }
+                            timeSecs = Math.min(timeSecs, uptime);
+                            Long count = statEntry.getValue().get(id.get_streamId());
+                            if (count != null) {
+                                LOG.debug("{} emitted {} for {} secs or {} tuples/sec",
+                                    id, count, timeSecs, count.doubleValue() / timeSecs);
+                                emittedRate.add(count.doubleValue() / timeSecs);
+                            }
+                        }
+                    }
+                }
+            }
+            builder.withRate(new NormalDistStats(emittedRate));
+            //TODO to know if the output keys are skewed we have to guess by looking
+            // at the down stream executed stats, but for now we are going to ignore it
+
+            //The OutputStream is done
+            LoadCompConf.Builder comp = boltBuilders.get(id.get_componentId());
+            if (comp == null) {
+                comp = spoutBuilders.get(id.get_componentId());
+            }
+            comp.withStream(builder.build());
+        }
+
+        List<LoadCompConf> spouts = spoutBuilders.values().stream()
+            .map((b) -> b.build())
+            .collect(Collectors.toList());
+
+        List<LoadCompConf> bolts = boltBuilders.values().stream()
+            .map((b) -> b.build())
+            .collect(Collectors.toList());
+
+        return new TopologyLoadConf(topologyName, savedTopoConf, spouts, bolts, streams);
+    }
+
+    /**
+     * Main entry point for CaptureLoad command.
+     * @param args the arguments to the command
+     * @throws Exception on any error
+     */
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        options.addOption(Option.builder("a")
+            .longOpt("anonymize")
+            .desc("Strip out any possibly identifiable information")
+            .build());
+        options.addOption(Option.builder("o")
+            .longOpt("output-dir")
+            .argName("<file>")
+            .hasArg()
+            .desc("Where to write (defaults to " + DEFAULT_OUT_DIR + ")")
+            .build());
+        options.addOption(Option.builder("h")
+            .longOpt("help")
+            .desc("Print a help message")
+            .build());
+        CommandLineParser parser = new DefaultParser();
+        CommandLine cmd = null;
+        ParseException pe = null;
+        try {
+            cmd = parser.parse(options, args);
+        } catch (ParseException e) {
+            pe = e;
+        }
+        if (pe != null || cmd.hasOption('h')) {
+            if (pe != null) {
+                System.err.println("ERROR " + pe.getMessage());
+            }
+            new HelpFormatter().printHelp("CaptureLoad [options] [topologyName]*", options);
+            return;
+        }
+
+        Config conf = new Config();
+        int exitStatus = -1;
+        String outputDir = DEFAULT_OUT_DIR;
+        if (cmd.hasOption('o')) {
+            outputDir = cmd.getOptionValue('o');
+        }
+        File baseOut = new File(outputDir);
+        LOG.info("Will save captured topologies to {}", baseOut);
+        baseOut.mkdirs();
+
+        try (NimbusClient nc = NimbusClient.getConfiguredClient(conf)) {
+            Nimbus.Iface client = nc.getClient();
+            List<String> topologyNames = cmd.getArgList();
+
+            ClusterSummary clusterSummary = client.getClusterInfo();
+            for (TopologySummary topologySummary: clusterSummary.get_topologies()) {
+                if (topologyNames.isEmpty() || topologyNames.contains(topologySummary.get_name())) {
+                    TopologyLoadConf capturedConf = captureTopology(client, topologySummary);
+                    if (cmd.hasOption('a')) {
+                        capturedConf = capturedConf.anonymize();
+                    }
+                    capturedConf.writeTo(new File(baseOut, capturedConf.name + ".yaml"));
+                }
+            }
+
+            exitStatus = 0;
+        } catch (Exception e) {
+            LOG.error("Error trying to capture topologies...", e);
+        } finally {
+            System.exit(exitStatus);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java
new file mode 100644
index 0000000..d0e0bd3
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CompStats.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.storm.utils.ObjectReader;
+
+/**
+ * A set of measurements about a component (bolt/spout) so we can statistically reproduce it.
+ */
+public class CompStats implements Serializable {
+    public final double cpuPercent; // Right now we don't have a good way to measure any kind of a distribution, this is all approximate
+    public final double memoryMb; //again no good way to get a distribution...
+
+    /**
+     * Parse out a CompStats from a config map.
+     * @param conf the map holding the CompStats values
+     * @return the parsed CompStats
+     */
+    public static CompStats fromConf(Map<String, Object> conf) {
+        double cpu = ObjectReader.getDouble(conf.get("cpuPercent"), 0.0);
+        double memory = ObjectReader.getDouble(conf.get("memoryMb"), 0.0);
+        return new CompStats(cpu, memory);
+    }
+
+    public void addToConf(Map<String, Object> ret) {
+        ret.put("cpuPercent", cpuPercent);
+        ret.put("memoryMb", memoryMb);
+    }
+
+    public CompStats(double cpuPercent, double memoryMb) {
+        this.cpuPercent = cpuPercent;
+        this.memoryMb = memoryMb;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java
new file mode 100644
index 0000000..80ede37
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.storm.Config;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.loadgen.CaptureLoad;
+import org.apache.storm.utils.NimbusClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Estimate the throughput of all topologies.
+ */
+public class EstimateThroughput {
+    private static final Logger LOG = LoggerFactory.getLogger(EstimateThroughput.class);
+
+    /**
+     * Main entry point for estimate throughput command.
+     * @param args the command line arguments.
+     * @throws Exception on any error.
+     */
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        options.addOption(Option.builder("h")
+            .longOpt("help")
+            .desc("Print a help message")
+            .build());
+        CommandLineParser parser = new DefaultParser();
+        CommandLine cmd = null;
+        ParseException pe = null;
+        try {
+            cmd = parser.parse(options, args);
+        } catch (ParseException e) {
+            pe = e;
+        }
+        if (pe != null || cmd.hasOption('h')) {
+            if (pe != null) {
+                System.err.println("ERROR " + pe.getMessage());
+            }
+            new HelpFormatter().printHelp("EstimateThroughput [options] [topologyName]*", options);
+            return;
+        }
+
+        Config conf = new Config();
+        int exitStatus = -1;
+
+        List<TopologyLoadConf> regular = new ArrayList<>();
+        List<TopologyLoadConf> trident = new ArrayList<>();
+
+        try (NimbusClient nc = NimbusClient.getConfiguredClient(conf)) {
+            Nimbus.Iface client = nc.getClient();
+            List<String> topologyNames = cmd.getArgList();
+
+            ClusterSummary clusterSummary = client.getClusterInfo();
+            for (TopologySummary topologySummary: clusterSummary.get_topologies()) {
+                if (topologyNames.isEmpty() || topologyNames.contains(topologySummary.get_name())) {
+                    TopologyLoadConf capturedConf = CaptureLoad.captureTopology(client, topologySummary);
+                    if (capturedConf.looksLikeTrident()) {
+                        trident.add(capturedConf);
+                    } else {
+                        regular.add(capturedConf);
+                    }
+                }
+            }
+
+            System.out.println("TOPOLOGY\tTOTAL MESSAGES/sec\tESTIMATED INPUT MESSAGES/sec");
+            for (TopologyLoadConf tl: regular) {
+                System.out.println(tl.name + "\t" + tl.getAllEmittedAggregate() + "\t" + tl.getSpoutEmittedAggregate());
+            }
+            for (TopologyLoadConf tl: trident) {
+                System.out.println(tl.name + "\t" + tl.getAllEmittedAggregate() + "\t" + tl.getTridentEstimatedEmittedAggregate());
+            }
+            exitStatus = 0;
+        } catch (Exception e) {
+            LOG.error("Error trying to capture topologies...", e);
+        } finally {
+            System.exit(exitStatus);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
new file mode 100644
index 0000000..7998fdc
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.metric.LoggingMetricsConsumer;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.NimbusClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generate a simulated load.
+ */
+public class GenLoad {
+    private static final Logger LOG = LoggerFactory.getLogger(GenLoad.class);
+    private static final int TEST_EXECUTE_TIME_DEFAULT = 5;
+
+    /**
+     * Main entry point for GenLoad application.
+     * @param args the command line args.
+     * @throws Exception on any error.
+     */
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        options.addOption(Option.builder("h")
+            .longOpt("help")
+            .desc("Print a help message")
+            .build());
+        options.addOption(Option.builder("t")
+            .longOpt("test-time")
+            .argName("MINS")
+            .hasArg()
+            .desc("How long to run the tests for in mins (defaults to " + TEST_EXECUTE_TIME_DEFAULT + ")")
+            .build());
+        options.addOption(Option.builder()
+            .longOpt("parallel")
+            .argName("MULTIPLIER")
+            .hasArg()
+            .desc("How much to scale the topology up or down in parallelism.\n"
+                + "The new parallelism will round up to the next whole number\n"
+                + "(defaults to 1.0 no scaling)")
+            .build());
+        options.addOption(Option.builder()
+            .longOpt("throughput")
+            .argName("MULTIPLIER")
+            .hasArg()
+            .desc("How much to scale the topology up or down in throughput.\n"
+                + "Note this is applied after and build on any parallelism changes.\n"
+                + "(defaults to 1.0 no scaling)")
+            .build());
+        options.addOption(Option.builder()
+            .longOpt("local-or-shuffle")
+            .desc("replace shuffle grouping with local or shuffle grouping")
+            .build());
+        options.addOption(Option.builder()
+            .longOpt("debug")
+            .desc("Print debug information about the adjusted topology before submitting it.")
+            .build());
+        LoadMetricsServer.addCommandLineOptions(options);
+        CommandLineParser parser = new DefaultParser();
+        CommandLine cmd = null;
+        Exception commandLineException = null;
+        double executeTime = TEST_EXECUTE_TIME_DEFAULT;
+        double parallel = 1.0;
+        double throughput = 1.0;
+        try {
+            cmd = parser.parse(options, args);
+            if (cmd.hasOption("t")) {
+                executeTime = Double.valueOf(cmd.getOptionValue("t"));
+            }
+            if (cmd.hasOption("parallel")) {
+                parallel = Double.parseDouble(cmd.getOptionValue("parallel"));
+            }
+            if (cmd.hasOption("throughput")) {
+                throughput = Double.parseDouble(cmd.getOptionValue("throughput"));
+            }
+        } catch (ParseException | NumberFormatException e) {
+            commandLineException = e;
+        }
+        if (commandLineException != null || cmd.hasOption('h')) {
+            if (commandLineException != null) {
+                System.err.println("ERROR " + commandLineException.getMessage());
+            }
+            new HelpFormatter().printHelp("GenLoad [options] [captured_file]*", options);
+            return;
+        }
+        Config conf = new Config();
+        LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd);
+
+        metricServer.serve();
+        String url = metricServer.getUrl();
+        int exitStatus = -1;
+        try (NimbusClient client = NimbusClient.getConfiguredClient(conf);
+             ScopedTopologySet topoNames = new ScopedTopologySet(client.getClient())) {
+            for (String topoFile : cmd.getArgList()) {
+                try {
+                    TopologyLoadConf tlc = readTopology(topoFile);
+                    if (parallel != 1.0) {
+                        tlc = tlc.scaleParallel(parallel);
+                    }
+                    if (throughput != 1.0) {
+                        tlc = tlc.scaleThroughput(throughput);
+                    }
+                    if (cmd.hasOption("local-or-shuffle")) {
+                        tlc = tlc.replaceShuffleWithLocalOrShuffle();
+                    }
+                    if (cmd.hasOption("debug")) {
+                        LOG.info("DEBUGGING: {}", tlc.toYamlString());
+                    }
+                    topoNames.add(parseAndSubmit(tlc, url));
+                } catch (Exception e) {
+                    System.err.println("Could Not Submit Topology From " + topoFile);
+                    e.printStackTrace(System.err);
+                }
+            }
+
+            metricServer.monitorFor(executeTime, client.getClient(), topoNames);
+            exitStatus = 0;
+        } catch (Exception e) {
+            LOG.error("Error trying to run topologies...", e);
+        } finally {
+            System.exit(exitStatus);
+        }
+    }
+
+    private static TopologyLoadConf readTopology(String topoFile) throws IOException {
+        File f = new File(topoFile);
+
+        TopologyLoadConf tlc = TopologyLoadConf.fromConf(f);
+        if (tlc.name == null) {
+            String fileName = f.getName();
+            int dot = fileName.lastIndexOf('.');
+            final String baseName = fileName.substring(0, dot);
+            tlc = tlc.withName(baseName);
+        }
+        return tlc;
+    }
+
+    static int uniquifier = 0;
+
+    private static String parseAndSubmit(TopologyLoadConf tlc, String url) throws IOException, InvalidTopologyException,
+        AuthorizationException, AlreadyAliveException {
+
+        //First we need some configs
+        Config conf = new Config();
+        if (tlc.topoConf != null) {
+            conf.putAll(tlc.topoConf);
+        }
+        //For some reason on the new code if ackers is null we get 0???
+        Object ackers = conf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
+        Object workers = conf.get(Config.TOPOLOGY_WORKERS);
+        if (ackers == null || ((Number)ackers).intValue() <= 0) {
+            if (workers == null) {
+                workers = 1;
+            }
+            conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, workers);
+        }
+        conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
+        conf.registerMetricsConsumer(HttpForwardingMetricsConsumer.class, url, 1);
+        Map<String, String> workerMetrics = new HashMap<>();
+        if (!NimbusClient.isLocalOverride()) {
+            //sigar uses JNI and does not work in local mode
+            workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
+        }
+        conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
+        conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
+
+        //Lets build a topology.
+        TopologyBuilder builder = new TopologyBuilder();
+        for (LoadCompConf spoutConf : tlc.spouts) {
+            System.out.println("ADDING SPOUT " + spoutConf.id);
+            builder.setSpout(spoutConf.id, new LoadSpout(spoutConf), spoutConf.parallelism);
+        }
+
+        Map<String, BoltDeclarer> boltDeclarers = new HashMap<>();
+        Map<String, LoadBolt> bolts = new HashMap<>();
+        if (tlc.bolts != null) {
+            for (LoadCompConf boltConf : tlc.bolts) {
+                System.out.println("ADDING BOLT " + boltConf.id);
+                LoadBolt lb = new LoadBolt(boltConf);
+                bolts.put(boltConf.id, lb);
+                boltDeclarers.put(boltConf.id, builder.setBolt(boltConf.id, lb, boltConf.parallelism));
+            }
+        }
+
+        if (tlc.streams != null) {
+            for (InputStream in : tlc.streams) {
+                BoltDeclarer declarer = boltDeclarers.get(in.toComponent);
+                if (declarer == null) {
+                    throw new IllegalArgumentException("to bolt " + in.toComponent + " does not exist");
+                }
+                LoadBolt lb = bolts.get(in.toComponent);
+                lb.add(in);
+                in.groupingType.assign(declarer, in);
+            }
+        }
+
+        String topoName = tlc.name + "-" + uniquifier++;
+        StormSubmitter.submitTopology(topoName, conf, builder.createTopology());
+        return topoName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GroupingType.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GroupingType.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GroupingType.java
new file mode 100644
index 0000000..a4e0c1a
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GroupingType.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.Locale;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * The different types of groupings that are supported.
+ */
+public enum GroupingType {
+    SHUFFLE {
+        @Override
+        public void assign(BoltDeclarer declarer, InputStream stream) {
+            declarer.shuffleGrouping(stream.fromComponent, stream.id);
+        }
+    },
+    FIELDS {
+        @Override
+        public void assign(BoltDeclarer declarer, InputStream stream) {
+            declarer.fieldsGrouping(stream.fromComponent, stream.id, new Fields("key"));
+        }
+    },
+    ALL {
+        @Override
+        public void assign(BoltDeclarer declarer, InputStream stream) {
+            declarer.allGrouping(stream.fromComponent, stream.id);
+        }
+    },
+    GLOBAL {
+        @Override
+        public void assign(BoltDeclarer declarer, InputStream stream) {
+            declarer.globalGrouping(stream.fromComponent, stream.id);
+        }
+    },
+    LOCAL_OR_SHUFFLE {
+        @Override
+        public void assign(BoltDeclarer declarer, InputStream stream) {
+            declarer.localOrShuffleGrouping(stream.fromComponent, stream.id);
+        }
+    },
+    NONE {
+        @Override
+        public void assign(BoltDeclarer declarer, InputStream stream) {
+            declarer.noneGrouping(stream.fromComponent, stream.id);
+        }
+    },
+    PARTIAL_KEY {
+        @Override
+        public void assign(BoltDeclarer declarer, InputStream stream) {
+            declarer.partialKeyGrouping(stream.fromComponent, stream.id, new Fields("key"));
+        }
+    };
+
+    /**
+     * Parse a String config value and covert it into the enum.
+     * @param conf the string config.
+     * @return the parsed grouping type or SHUFFLE if conf is null.
+     * @throws IllegalArgumentException if parsing does not work.
+     */
+    public static GroupingType fromConf(String conf) {
+        String gt = "SHUFFLE";
+        if (conf != null) {
+            gt = conf.toUpperCase(Locale.ENGLISH);
+        }
+        return GroupingType.valueOf(gt);
+    }
+
+    public String toConf() {
+        return toString();
+    }
+
+    public abstract void assign(BoltDeclarer declarer, InputStream stream);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
index aa4579c..5829e9d 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsConsumer.java
@@ -15,49 +15,46 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.misc.metric;
 
+package org.apache.storm.loadgen;
+
+import com.esotericsoftware.kryo.io.Output;
+import java.net.HttpURLConnection;
+import java.net.URL;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
-import java.net.URL;
-import java.net.HttpURLConnection;
-
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.storm.serialization.KryoValuesSerializer;
-
 import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.serialization.KryoValuesSerializer;
 import org.apache.storm.task.IErrorReporter;
 import org.apache.storm.task.TopologyContext;
 
 /**
- * Listens for all metrics and POSTs them serialized to a configured URL
- *
- * To use, add this to your topology's configuration:
+ * Listens for all metrics and POSTs them serialized to a configured URL.
  *
+ * <p>To use, add this to your topology's configuration:
  * ```java
- *   conf.registerMetricsConsumer(org.apache.storm.metrics.HttpForwardingMetricsConsumer.class, "http://example.com:8080/metrics/my-topology/", 1);
+ *   conf.registerMetricsConsumer(HttpForwardingMetricsConsumer.class, "http://example.com:8080/metrics/my-topology/", 1);
  * ```
  *
- * The body of the post is data serialized using {@link org.apache.storm.serialization.KryoValuesSerializer}, with the data passed in
- * as a list of `[TaskInfo, Collection<DataPoint>]`.  More things may be appended to the end of the list in the future.
+ * <p>The body of the post is data serialized using {@link org.apache.storm.serialization.KryoValuesSerializer}, with the data passed in
+ * as a list of `[TaskInfo, Collection&lt;DataPoint&gt;]`.  More things may be appended to the end of the list in the future.
  *
- * The values can be deserialized using the org.apache.storm.serialization.KryoValuesDeserializer, and a 
- * correct config + classpath.
+ * <p>The values can be deserialized using the org.apache.storm.serialization.KryoValuesDeserializer, and a correct config + classpath.
  *
- * @see org.apache.storm.serialization.KryoValuesSerializer
+ * <p>@see org.apache.storm.serialization.KryoValuesSerializer
  */
 public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
-    private transient URL _url; 
-    private transient IErrorReporter _errorReporter;
-    private transient KryoValuesSerializer _serializer;
+    private transient URL url;
+    private transient IErrorReporter errorReporter;
+    private transient KryoValuesSerializer serializer;
 
     @Override
     public void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { 
         try {
-            _url = new URL((String)registrationArgument);
-            _errorReporter = errorReporter;
-            _serializer = new KryoValuesSerializer(topoConf);
+            url = new URL((String)registrationArgument);
+            this.errorReporter = errorReporter;
+            serializer = new KryoValuesSerializer(topoConf);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -66,13 +63,13 @@ public class HttpForwardingMetricsConsumer implements IMetricsConsumer {
     @Override
     public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
         try {
-            HttpURLConnection con = (HttpURLConnection)_url.openConnection();
+            HttpURLConnection con = (HttpURLConnection) url.openConnection();
             con.setRequestMethod("POST");
             con.setDoOutput(true);
-            Output out = new Output(con.getOutputStream());
-            _serializer.serializeInto(Arrays.asList(taskInfo, dataPoints), out);
-            out.flush();
-            out.close();
+            try (Output out = new Output(con.getOutputStream())) {
+                serializer.serializeInto(Arrays.asList(taskInfo, dataPoints), out);
+                out.flush();
+            }
             //The connection is not sent unless a response is requested
             int response = con.getResponseCode();
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
index ef2769a..99a980b 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/HttpForwardingMetricsServer.java
@@ -15,27 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.misc.metric;
 
+package org.apache.storm.loadgen;
+
+import com.esotericsoftware.kryo.io.Input;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
 import java.util.Collection;
-import java.util.Map;
 import java.util.List;
-import java.net.ServerSocket;
-import java.net.InetAddress;
-
+import java.util.Map;
+import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import javax.servlet.ServletException;
-
-import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
 import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
-
-import com.esotericsoftware.kryo.io.Input;
+import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
 import org.apache.storm.serialization.KryoValuesDeserializer;
 import org.apache.storm.utils.Utils;
-
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -44,64 +41,72 @@ import org.eclipse.jetty.servlet.ServletHolder;
  * A server that can listen for metrics from the HttpForwardingMetricsConsumer.
  */
 public abstract class HttpForwardingMetricsServer {
-    private Map _conf;
-    private Server _server = null;
-    private int _port = -1;
-    private String _url = null;
+    private Map conf;
+    private Server server = null;
+    private int port = -1;
+    private String url = null;
 
-    ThreadLocal<KryoValuesDeserializer> _des = new ThreadLocal<KryoValuesDeserializer>() {
+    ThreadLocal<KryoValuesDeserializer> des = new ThreadLocal<KryoValuesDeserializer>() {
         @Override
         protected KryoValuesDeserializer initialValue() {
-            return new KryoValuesDeserializer(_conf);
+            return new KryoValuesDeserializer(conf);
         }
     };
 
-    private class MetricsCollectionServlet extends HttpServlet
-    {
-        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
-        {
+    private class MetricsCollectionServlet extends HttpServlet {
+        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
             Input in = new Input(request.getInputStream());
-            List<Object> metrics = _des.get().deserializeFrom(in);
+            List<Object> metrics = des.get().deserializeFrom(in);
             handle((TaskInfo)metrics.get(0), (Collection<DataPoint>)metrics.get(1));
             response.setStatus(HttpServletResponse.SC_OK);
         }
     }
 
+    /**
+     * Constructor.
+     * @param conf the configuration for storm.
+     */
     public HttpForwardingMetricsServer(Map<String, Object> conf) {
-        _conf = Utils.readStormConfig();
+        this.conf = Utils.readStormConfig();
         if (conf != null) {
-            _conf.putAll(conf);
+            this.conf.putAll(conf);
         }
     }
 
     //This needs to be thread safe
     public abstract void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
 
+    /**
+     * Start the server.
+     * @param port the port it shuld listen on, or null/<= 0 to pick a free ephemeral port.
+     */
     public void serve(Integer port) {
         try {
-            if (_server != null) throw new RuntimeException("The server is already running");
+            if (server != null) {
+                throw new RuntimeException("The server is already running");
+            }
     
             if (port == null || port <= 0) {
                 ServerSocket s = new ServerSocket(0);
                 port = s.getLocalPort();
                 s.close();
             }
-            _server = new Server(port);
-            _port = port;
-            _url = "http://"+InetAddress.getLocalHost().getHostName()+":"+_port+"/";
+            server = new Server(port);
+            this.port = port;
+            url = "http://" + InetAddress.getLocalHost().getHostName() + ":" + this.port + "/";
  
             ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
             context.setContextPath("/");
-            _server.setHandler(context);
+            server.setHandler(context);
  
             context.addServlet(new ServletHolder(new MetricsCollectionServlet()),"/*");
 
-            _server.start();
-         } catch (RuntimeException e) {
-             throw e;
-         } catch (Exception e) {
-             throw new RuntimeException(e);
-         }
+            server.start();
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public void serve() {
@@ -109,10 +114,10 @@ public abstract class HttpForwardingMetricsServer {
     }
 
     public int getPort() {
-        return _port;
+        return port;
     }
 
     public String getUrl() {
-        return _url;
+        return url;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java
new file mode 100644
index 0000000..19802d9
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.grouping.PartialKeyGrouping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of measurements about a stream so we can statistically reproduce it.
+ */
+public class InputStream implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(InputStream.class);
+    public final String fromComponent;
+    public final String toComponent;
+    public final String id;
+    public final NormalDistStats execTime;
+    public final NormalDistStats processTime;
+    public final GroupingType groupingType;
+    //Cached GlobalStreamId
+    private GlobalStreamId gsid = null;
+
+    /**
+     * Create an output stream from a config.
+     * @param conf the config to read from.
+     * @return the read OutputStream.
+     */
+    public static InputStream fromConf(Map<String, Object> conf) {
+        String component = (String) conf.get("from");
+        String toComp = (String) conf.get("to");
+        NormalDistStats execTime = NormalDistStats.fromConf((Map<String, Object>) conf.get("execTime"));
+        NormalDistStats processTime = NormalDistStats.fromConf((Map<String, Object>) conf.get("processTime"));
+        Map<String, Object> grouping = (Map<String, Object>) conf.get("grouping");
+        GroupingType groupingType = GroupingType.fromConf((String) grouping.get("type"));
+        String streamId = (String) grouping.getOrDefault("streamId", "default");
+        return new InputStream(component, toComp, streamId, execTime, processTime, groupingType);
+    }
+
+    /**
+     * Convert this to a conf.
+     * @return the conf.
+     */
+    public Map<String, Object> toConf() {
+        Map<String, Object> ret = new HashMap<>();
+        ret.put("from", fromComponent);
+        ret.put("to", toComponent);
+        ret.put("execTime", execTime.toConf());
+        ret.put("processTime", processTime.toConf());
+
+        Map<String, Object> grouping = new HashMap<>();
+        grouping.put("streamId", id);
+        grouping.put("type", groupingType.toConf());
+        ret.put("grouping", grouping);
+
+        return ret;
+    }
+
+    public static class Builder {
+        private String fromComponent;
+        private String toComponent;
+        private String id;
+        private NormalDistStats execTime;
+        private NormalDistStats processTime;
+        private GroupingType groupingType = GroupingType.SHUFFLE;
+
+        public String getFromComponent() {
+            return fromComponent;
+        }
+
+        public Builder withFromComponent(String fromComponent) {
+            this.fromComponent = fromComponent;
+            return this;
+        }
+
+        public String getToComponent() {
+            return toComponent;
+        }
+
+        public Builder withToComponent(String toComponent) {
+            this.toComponent = toComponent;
+            return this;
+        }
+
+        public String getId() {
+            return id;
+        }
+
+        public Builder withId(String id) {
+            this.id = id;
+            return this;
+        }
+
+        public NormalDistStats getExecTime() {
+            return execTime;
+        }
+
+        public Builder withExecTime(NormalDistStats execTime) {
+            this.execTime = execTime;
+            return this;
+        }
+
+        public NormalDistStats getProcessTime() {
+            return processTime;
+        }
+
+        public Builder withProcessTime(NormalDistStats processTime) {
+            this.processTime = processTime;
+            return this;
+        }
+
+        public GroupingType getGroupingType() {
+            return groupingType;
+        }
+
+        public Builder withGroupingType(GroupingType groupingType) {
+            this.groupingType = groupingType;
+            return this;
+        }
+
+        /**
+         * Add the grouping type based off of the thrift Grouping class.
+         * @param grouping the Grouping to extract the grouping type from
+         * @return this
+         */
+        @SuppressWarnings("checkstyle:FallThrough")
+        public Builder withGroupingType(Grouping grouping) {
+            GroupingType group = GroupingType.SHUFFLE;
+            Grouping._Fields thriftType = grouping.getSetField();
+
+            switch (thriftType) {
+                case FIELDS:
+                    //Global Grouping is fields with an empty list
+                    if (grouping.get_fields().isEmpty()) {
+                        group = GroupingType.GLOBAL;
+                    } else {
+                        group = GroupingType.FIELDS;
+                    }
+                    break;
+                case ALL:
+                    group = GroupingType.ALL;
+                    break;
+                case NONE:
+                    group = GroupingType.NONE;
+                    break;
+                case SHUFFLE:
+                    group = GroupingType.SHUFFLE;
+                    break;
+                case LOCAL_OR_SHUFFLE:
+                    group = GroupingType.LOCAL_OR_SHUFFLE;
+                    break;
+                case CUSTOM_SERIALIZED:
+                    //This might be a partial key grouping..
+                    byte[] data = grouping.get_custom_serialized();
+                    try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
+                         ObjectInputStream ois = new ObjectInputStream(bis);) {
+                        Object cg = ois.readObject();
+                        if (cg instanceof PartialKeyGrouping) {
+                            group = GroupingType.PARTIAL_KEY;
+                            break;
+                        }
+                    } catch (Exception e) {
+                        //ignored
+                    }
+                    //Fall through if not supported
+                default:
+                    LOG.warn("{} is not supported for replay of a topology.  Using SHUFFLE", thriftType);
+                    break;
+            }
+            return withGroupingType(group);
+        }
+
+        public InputStream build() {
+            return new InputStream(fromComponent, toComponent, id, execTime, processTime, groupingType);
+        }
+    }
+
+    /**
+     * Create a new input stream to a bolt.
+     * @param fromComponent the source component of the stream.
+     * @param id the id of the stream
+     * @param execTime exec time stats
+     * @param processTime process time stats
+     */
+    public InputStream(String fromComponent, String toComponent, String id, NormalDistStats execTime,
+                       NormalDistStats processTime, GroupingType groupingType) {
+        this.fromComponent = fromComponent;
+        this.toComponent = toComponent;
+        if (fromComponent == null) {
+            throw new IllegalArgumentException("from cannot be null");
+        }
+        if (toComponent == null) {
+            throw new IllegalArgumentException("to cannot be null");
+        }
+        this.id = id;
+        if (id == null) {
+            throw new IllegalArgumentException("id cannot be null");
+        }
+        this.execTime = execTime;
+        this.processTime = processTime;
+        this.groupingType = groupingType;
+        if (groupingType == null) {
+            throw new IllegalArgumentException("grouping type cannot be null");
+        }
+    }
+
+    /**
+     * Get the global stream id for this input stream.
+     * @return the GlobalStreamId for this input stream.
+     */
+    public synchronized GlobalStreamId gsid() {
+        if (gsid == null) {
+            gsid = new GlobalStreamId(fromComponent, id);
+        }
+        return gsid;
+    }
+
+    /**
+     * Remap the names of components.
+     * @param remappedComponents old name to new name of components.
+     * @param remappedStreams old ID to new ID of streams.
+     * @return a modified version of this with names remapped.
+     */
+    public InputStream remap(Map<String, String> remappedComponents, Map<GlobalStreamId, GlobalStreamId> remappedStreams) {
+        String remapTo = remappedComponents.get(toComponent);
+        String remapFrom = remappedComponents.get(fromComponent);
+        GlobalStreamId remapStreamId = remappedStreams.get(gsid());
+        return new InputStream(remapFrom, remapTo, remapStreamId.get_streamId(), execTime, processTime, groupingType);
+    }
+
+    /**
+     * Replace all SHUFFLE groupings with LOCAL_OR_SHUFFLE.
+     * @return a modified copy of this
+     */
+    public InputStream replaceShuffleWithLocalOrShuffle() {
+        if (groupingType != GroupingType.SHUFFLE) {
+            return this;
+        }
+        return new InputStream(fromComponent, toComponent, id, execTime, processTime, GroupingType.LOCAL_OR_SHUFFLE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
new file mode 100644
index 0000000..e02e8f8
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A bolt that simulates a real world bolt based off of statistics about it.
+ */
+public class LoadBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(LoadBolt.class);
+    private static final long NANO_IN_MS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
+    private final List<OutputStream> outputStreamStats;
+    private List<OutputStreamEngine> outputStreams;
+    private final Map<GlobalStreamId, InputStream> inputStreams = new HashMap<>();
+    private OutputCollector collector;
+    private Random rand;
+    private ScheduledExecutorService timer;
+
+    private static long toNano(double ms) {
+        return (long)(ms * NANO_IN_MS);
+    }
+
+    public LoadBolt(LoadCompConf conf) {
+        this.outputStreamStats = Collections.unmodifiableList(new ArrayList<>(conf.streams));
+    }
+
+    public void add(InputStream inputStream) {
+        GlobalStreamId id = inputStream.gsid();
+        inputStreams.put(id, inputStream);
+    }
+
+    @Override
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
+        outputStreams = Collections.unmodifiableList(outputStreamStats.stream()
+            .map((ss) -> new OutputStreamEngine(ss)).collect(Collectors.toList()));
+        this.collector = collector;
+        this.rand = ThreadLocalRandom.current();
+        this.timer = Executors.newSingleThreadScheduledExecutor();
+    }
+
+    private final AtomicLong parkOffset = new AtomicLong(0);
+
+    private void mySleep(long endTime) {
+        //There are some different levels of accuracy here, and we want to deal with all of them
+        long start = System.nanoTime();
+        long newEnd = endTime - parkOffset.get();
+        long diff = newEnd - start;
+        if (diff <= 1_000) {
+            //We are done, nothing that short is going to work here
+        } else if (diff < NANO_IN_MS) {
+            //Busy wait...
+            long sum = 0;
+            while (System.nanoTime() < newEnd) {
+                for (long i = 0; i < 1_000_000; i++) {
+                    sum += i;
+                }
+            }
+        } else {
+            //More accurate that thread.sleep, but still not great
+            LockSupport.parkNanos(newEnd - System.nanoTime() - parkOffset.get());
+            // A small control algorithm to adjust the amount of time that we sleep to make it more accurate
+        }
+        parkOffset.addAndGet((System.nanoTime() - endTime) / 2);
+    }
+
+    private void emitTuples(Tuple input) {
+        for (OutputStreamEngine se: outputStreams) {
+            // we may output many tuples for a given input tuple
+            while (se.shouldEmit() != null) {
+                collector.emit(se.streamName, input, new Values(se.nextKey(), "SOME-BOLT-VALUE"));
+            }
+        }
+    }
+
+    @Override
+    public void execute(final Tuple input) {
+        long startTimeNs = System.nanoTime();
+        InputStream in = inputStreams.get(input.getSourceGlobalStreamId());
+        if (in == null) {
+            emitTuples(input);
+            collector.ack(input);
+        } else {
+            long endExecNs = startTimeNs + toNano(in.execTime.nextRandom(rand));
+            long endProcNs = startTimeNs + toNano(in.processTime.nextRandom(rand));
+
+            if ((endProcNs - 1_000_000) < endExecNs) {
+                mySleep(endProcNs);
+                emitTuples(input);
+                collector.ack(input);
+            } else {
+                timer.schedule(() -> {
+                    emitTuples(input);
+                    collector.ack(input);
+                }, Math.max(0, endProcNs - System.nanoTime()), TimeUnit.NANOSECONDS);
+            }
+
+            mySleep(endExecNs);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for (OutputStream s: outputStreamStats) {
+            declarer.declareStream(s.id, new Fields("key", "value"));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
new file mode 100644
index 0000000..baead0f
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.utils.ObjectReader;
+
+/**
+ * Configuration for a simulated spout.
+ */
+public class LoadCompConf {
+    public final String id;
+    public final int parallelism;
+    public final List<OutputStream> streams;
+    public final CompStats stats;
+
+    /**
+     * Parse the LoadCompConf from a config Map.
+     * @param conf the map holding the config for a LoadCompConf.
+     * @return the parsed object.
+     */
+    public static LoadCompConf fromConf(Map<String, Object> conf) {
+        String id = (String) conf.get("id");
+        int parallelism = ObjectReader.getInt(conf.get("parallelism"), 1);
+        List<OutputStream> streams = new ArrayList<>();
+        List<Map<String, Object>> streamData = (List<Map<String, Object>>) conf.get("streams");
+        if (streamData != null) {
+            for (Map<String, Object> streamInfo: streamData) {
+                streams.add(OutputStream.fromConf(streamInfo));
+            }
+        }
+
+        return new LoadCompConf(id, parallelism, streams, CompStats.fromConf(conf));
+    }
+
+    /**
+     * Build a config map for this object.
+     * @return the config map.
+     */
+    public Map<String, Object> toConf() {
+        Map<String, Object> ret = new HashMap<>();
+        ret.put("id", id);
+        ret.put("parallelism", parallelism);
+
+        if (streams != null) {
+            List<Map<String, Object>> streamData = new ArrayList<>();
+            for (OutputStream out : streams) {
+                streamData.add(out.toConf());
+            }
+            ret.put("streams", streamData);
+        }
+        if (stats != null) {
+            stats.addToConf(ret);
+        }
+        return ret;
+    }
+
+    /**
+     * Chenge the name of components and streams according to the parameters passed in.
+     * @param remappedComponents original component name to new component name.
+     * @param remappedStreams original stream id to new stream id.
+     * @return a copy of this with the values remapped.
+     */
+    public LoadCompConf remap(Map<String, String> remappedComponents, Map<GlobalStreamId, GlobalStreamId> remappedStreams) {
+        String remappedId = remappedComponents.get(id);
+        List<OutputStream> remappedOutStreams = (streams == null) ? null :
+            streams.stream()
+                .map((orig) -> orig.remap(id, remappedStreams))
+                .collect(Collectors.toList());
+
+        return new LoadCompConf(remappedId, parallelism, remappedOutStreams, stats);
+    }
+
+    /**
+     * Scale the parallelism of this component by v.  The aggregate throughput will be the same.
+     * The parallelism will be rounded up to the next largest whole number.  Parallelism will always be at least 1.
+     * @param v 1.0 is not change 0.5 is drop the parallelism by half.
+     * @return a copy of this with the parallelism adjusted.
+     */
+    public LoadCompConf scaleParallel(double v) {
+        return setParallel(Math.max(1, (int)Math.ceil(parallelism * v)));
+    }
+
+    /**
+     * Set the parallelism of this component, and adjust the throughput so in aggregate it stays the same.
+     * @param newParallelism the new parallelism to set.
+     * @return a copy of this with the adjustments made.
+     */
+    public LoadCompConf setParallel(int newParallelism) {
+        //We need to adjust the throughput accordingly (so that it stays the same in aggregate)
+        double throughputAdjustment = ((double)parallelism) / newParallelism;
+        return new LoadCompConf(id, newParallelism, streams, stats).scaleThroughput(throughputAdjustment);
+    }
+
+    /**
+     * Scale the throughput of this component.
+     * @param v 1.0 is unchanged 0.5 will cut the throughput in half.
+     * @return a copu of this with the adjustments made.
+     */
+    public LoadCompConf scaleThroughput(double v) {
+        if (streams != null) {
+            List<OutputStream> newStreams = streams.stream().map((s) -> s.scaleThroughput(v)).collect(Collectors.toList());
+            return new LoadCompConf(id, parallelism, newStreams, stats);
+        } else {
+            return this;
+        }
+    }
+
+    /**
+     * Compute the total amount of all messages emitted in all streams per second.
+     * @return the sum of all messages emitted per second.
+     */
+    public double getAllEmittedAggregate() {
+        double ret = 0;
+        if (streams != null) {
+            for (OutputStream out: streams) {
+                if (out.rate != null) {
+                    ret += out.rate.mean * parallelism;
+                }
+            }
+        }
+        return ret;
+    }
+
+    public static class Builder {
+        private String id;
+        private int parallelism = 1;
+        private List<OutputStream> streams;
+        private CompStats stats;
+
+        public String getId() {
+            return id;
+        }
+
+        public Builder withId(String id) {
+            this.id = id;
+            return this;
+        }
+
+        public int getParallelism() {
+            return parallelism;
+        }
+
+        public Builder withParallelism(int parallelism) {
+            this.parallelism = parallelism;
+            return this;
+        }
+
+        public List<OutputStream> getStreams() {
+            return streams;
+        }
+
+        /**
+         * Add in a single OutputStream to this component.
+         * @param stream the stream to add
+         * @return this
+         */
+        public Builder withStream(OutputStream stream) {
+            if (streams == null) {
+                streams = new ArrayList<>();
+            }
+            streams.add(stream);
+            return this;
+        }
+
+        public Builder withStreams(List<OutputStream> streams) {
+            this.streams = streams;
+            return this;
+        }
+
+        public CompStats getStats() {
+            return stats;
+        }
+
+        public Builder withStats(CompStats stats) {
+            this.stats = stats;
+            return this;
+        }
+
+        public LoadCompConf build() {
+            return new LoadCompConf(id, parallelism, streams, stats);
+        }
+    }
+
+    /**
+     * Create a new LoadCompConf with the given values.
+     * @param id the id of the component.
+     * @param parallelism tha parallelism of the component.
+     * @param streams the output streams of the component.
+     * @param stats the stats of the component.
+     */
+    public LoadCompConf(String id, int parallelism, List<OutputStream> streams, CompStats stats) {
+        this.id = id;
+        if (id == null) {
+            throw new IllegalArgumentException("A spout ID cannot be null");
+        }
+        this.parallelism = parallelism;
+        this.streams = streams;
+        this.stats = stats;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java
new file mode 100644
index 0000000..9030379
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadEngine.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.storm.loadgen.CompStats;
+
+/**
+ * The goal of this class is to provide a set of "Tuples" to send that will match as closely as possible the characteristics
+ * measured from a production topology.
+ */
+public class LoadEngine {
+
+    //TODO need to do a lot...
+
+    /**
+     * Provides an API to simulate the timings and CPU utilization of a bolt or spout.
+     */
+    public static class InputTimingEngine {
+        private final Random rand;
+        private final CompStats stats;
+
+        public InputTimingEngine(CompStats stats) {
+            this.stats = stats;
+            rand = ThreadLocalRandom.current();
+        }
+    }
+}