You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:44 UTC

[03/18] storm git commit: STORM-2702: storm-loadgen

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
new file mode 100644
index 0000000..c126ca5
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -0,0 +1,784 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.HdrHistogram.Histogram;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.SpoutStats;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A metrics server that records and reports metrics for a set of running topologies.
+ */
+public class LoadMetricsServer extends HttpForwardingMetricsServer {
+    private static final Logger LOG = LoggerFactory.getLogger(HttpForwardingMetricsServer.class);
+
+    private static class MemMeasure {
+        private long mem = 0;
+        private long time = 0;
+
+        synchronized void update(long mem) {
+            this.mem = mem;
+            time = System.currentTimeMillis();
+        }
+
+        public synchronized long get() {
+            return isExpired() ? 0L : mem;
+        }
+
+        synchronized boolean isExpired() {
+            return (System.currentTimeMillis() - time) >= 20000;
+        }
+    }
+
+    @VisibleForTesting
+    static double convert(double value, TimeUnit from, TimeUnit target) {
+        if (target.compareTo(from) > 0) {
+            return value / from.convert(1, target);
+        }
+        return value * target.convert(1, from);
+    }
+
+    public static class Measurements {
+        private final Histogram histo;
+        private double userMs;
+        private double sysMs;
+        private double gcMs;
+        private long memBytes;
+        private long uptimeSecs;
+        private long timeWindow;
+        private long acked;
+        private long failed;
+        private Set<String> topologyIds;
+
+        /**
+         * Constructor.
+         * @param histo latency histogram.
+         * @param userMs user CPU in ms.
+         * @param sysMs system CPU in ms.
+         * @param gcMs GC CPU in ms.
+         */
+        public Measurements(long uptimeSecs, long acked, long timeWindow, long failed, Histogram histo,
+                            double userMs, double sysMs, double gcMs, long memBytes, Set<String> topologyIds) {
+            this.uptimeSecs = uptimeSecs;
+            this.acked = acked;
+            this.timeWindow = timeWindow;
+            this.failed = failed;
+            this.userMs = userMs;
+            this.sysMs = sysMs;
+            this.gcMs = gcMs;
+            this.histo = histo;
+            this.memBytes = memBytes;
+            this.topologyIds = topologyIds;
+        }
+
+        /**
+         * Default Constructor.
+         */
+        public Measurements() {
+            histo = new Histogram(3600000000000L, 3);
+            sysMs = 0;
+            userMs = 0;
+            gcMs = 0;
+            memBytes = 0;
+            uptimeSecs = 0;
+            timeWindow = 0;
+            acked = 0;
+            failed = 0;
+            topologyIds = new HashSet<>();
+        }
+
+        /**
+         * Add other to this.
+         * @param other meaurements to add in.
+         */
+        public void add(Measurements other) {
+            histo.add(other.histo);
+            sysMs += other.sysMs;
+            userMs += other.userMs;
+            gcMs += other.gcMs;
+            memBytes = Math.max(memBytes, other.memBytes);
+            acked += other.acked;
+            failed += other.failed;
+            uptimeSecs = Math.max(uptimeSecs, other.uptimeSecs);
+            timeWindow += other.timeWindow;
+            topologyIds.addAll(other.topologyIds);
+        }
+
+        public double getLatencyAtPercentile(double percential, TimeUnit unit) {
+            return convert(histo.getValueAtPercentile(percential), TimeUnit.NANOSECONDS, unit);
+        }
+
+        public double getMinLatency(TimeUnit unit) {
+            return convert(histo.getMinValue(), TimeUnit.NANOSECONDS, unit);
+        }
+
+        public double getMaxLatency(TimeUnit unit) {
+            return convert(histo.getMaxValue(), TimeUnit.NANOSECONDS, unit);
+        }
+
+        public double getMeanLatency(TimeUnit unit) {
+            return convert(histo.getMean(), TimeUnit.NANOSECONDS, unit);
+        }
+
+        public double getLatencyStdDeviation(TimeUnit unit) {
+            return convert(histo.getStdDeviation(), TimeUnit.NANOSECONDS, unit);
+        }
+
+        public double getUserTime(TimeUnit unit) {
+            return convert(userMs, TimeUnit.MILLISECONDS, unit);
+        }
+
+        public double getSysTime(TimeUnit unit) {
+            return convert(sysMs, TimeUnit.MILLISECONDS, unit);
+        }
+
+        public double getGc(TimeUnit unit) {
+            return convert(gcMs, TimeUnit.MILLISECONDS, unit);
+        }
+
+        public double getMemMb() {
+            return memBytes / (1024.0 * 1024.0);
+        }
+
+        public long getUptimeSecs() {
+            return uptimeSecs;
+        }
+
+        public long getCompleted() {
+            return histo.getTotalCount();
+        }
+
+        public double getCompletedPerSec() {
+            return getCompleted() / (double)timeWindow;
+        }
+
+        public long getAcked() {
+            return acked;
+        }
+
+        public double getAckedPerSec() {
+            return acked / (double)timeWindow;
+        }
+
+        public long getFailed() {
+            return failed;
+        }
+
+        public long startTime() {
+            return uptimeSecs - timeWindow;
+        }
+
+        public long endTime() {
+            return uptimeSecs;
+        }
+
+        public double getTimeWindow() {
+            return timeWindow;
+        }
+
+        public Set<String> getTopologyIds() {
+            return topologyIds;
+        }
+
+        static Measurements combine(List<Measurements> measurements, Integer start, Integer count) {
+            if (count == null) {
+                count = measurements.size();
+            }
+
+            if (start == null) {
+                start = measurements.size() - count;
+            }
+            start = Math.max(0, start);
+            count = Math.min(count, measurements.size() - start);
+
+            Measurements ret = new Measurements();
+            for (int i = start; i < start + count; i ++) {
+                ret.add(measurements.get(i));
+            }
+            return ret;
+        }
+    }
+
+    interface MetricResultsReporter {
+        void start();
+
+        void reportWindow(Measurements inWindow, List<Measurements> allTime);
+
+        void finish(List<Measurements> allTime) throws Exception;
+    }
+
+    abstract static class FileReporter implements MetricResultsReporter {
+        protected final PrintStream out;
+        private final boolean needsClose;
+
+        public FileReporter() throws FileNotFoundException {
+            this(null, null);
+        }
+
+        public FileReporter(String path, Map<String, String> query) throws FileNotFoundException {
+            boolean append = Boolean.parseBoolean(query.getOrDefault("append", "false"));
+
+            if (path == null || "/dev/stdout".equals(path)) {
+                out = System.out;
+                needsClose = false;
+            } else if ("/dev/stderr".equals(path)) {
+                out = System.err;
+                needsClose = false;
+            } else {
+                out = new PrintStream(new FileOutputStream(path, append));
+                needsClose = true;
+            }
+        }
+
+        @Override
+        public void start() {
+            //NOOP
+        }
+
+        @Override
+        public void finish(List<Measurements> allTime) throws Exception {
+            if (needsClose && out != null) {
+                out.close();
+            }
+        }
+    }
+
+    private static final Map<String, TimeUnit> UNIT_MAP;
+
+    static {
+        HashMap<String, TimeUnit> tmp = new HashMap<>();
+        tmp.put("NS", TimeUnit.NANOSECONDS);
+        tmp.put("NANO", TimeUnit.NANOSECONDS);
+        tmp.put("NANOSEC", TimeUnit.NANOSECONDS);
+        tmp.put("NANOSECOND", TimeUnit.NANOSECONDS);
+        tmp.put("NANOSECONDS", TimeUnit.NANOSECONDS);
+        tmp.put("μS", TimeUnit.MICROSECONDS);
+        tmp.put("US", TimeUnit.MICROSECONDS);
+        tmp.put("MICRO", TimeUnit.MICROSECONDS);
+        tmp.put("MICROSEC", TimeUnit.MICROSECONDS);
+        tmp.put("MICROSECOND", TimeUnit.MICROSECONDS);
+        tmp.put("MICROSECONDS", TimeUnit.MICROSECONDS);
+        tmp.put("MS", TimeUnit.MILLISECONDS);
+        tmp.put("MILLI", TimeUnit.MILLISECONDS);
+        tmp.put("MILLISEC", TimeUnit.MILLISECONDS);
+        tmp.put("MILLISECOND", TimeUnit.MILLISECONDS);
+        tmp.put("MILLISECONDS", TimeUnit.MILLISECONDS);
+        tmp.put("S", TimeUnit.SECONDS);
+        tmp.put("SEC", TimeUnit.SECONDS);
+        tmp.put("SECOND", TimeUnit.SECONDS);
+        tmp.put("SECONDS", TimeUnit.SECONDS);
+        tmp.put("M", TimeUnit.MINUTES);
+        tmp.put("MIN", TimeUnit.MINUTES);
+        tmp.put("MINUTE", TimeUnit.MINUTES);
+        tmp.put("MINUTES", TimeUnit.MINUTES);
+        UNIT_MAP = Collections.unmodifiableMap(tmp);
+    }
+
+    private static final Map<TimeUnit, String> TIME_UNIT_NAME;
+
+    static {
+        HashMap<TimeUnit, String> tmp = new HashMap<>();
+        tmp.put(TimeUnit.NANOSECONDS, "ns");
+        tmp.put(TimeUnit.MICROSECONDS, "μs");
+        tmp.put(TimeUnit.MILLISECONDS, "ms");
+        tmp.put(TimeUnit.SECONDS, "s");
+        tmp.put(TimeUnit.MINUTES, "m");
+        TIME_UNIT_NAME = Collections.unmodifiableMap(tmp);
+    }
+
+    private static final Map<String, MetricExtractor> NAMED_EXTRACTORS;
+
+    static {
+        //Perhaps there is a better way to do this???
+        HashMap<String, MetricExtractor> tmp = new HashMap<>();
+        tmp.put("99%ile", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(99.0, unit)));
+        tmp.put("99.9%ile", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(99.9, unit)));
+        tmp.put("median", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(50, unit)));
+        tmp.put("mean", new MetricExtractor((m, unit) -> m.getMeanLatency(unit)));
+        tmp.put("min", new MetricExtractor((m, unit) -> m.getMinLatency(unit)));
+        tmp.put("max", new MetricExtractor((m, unit) -> m.getMaxLatency(unit)));
+        tmp.put("stddev", new MetricExtractor((m, unit) -> m.getLatencyStdDeviation(unit)));
+        tmp.put("user_cpu", new MetricExtractor((m, unit) -> m.getUserTime(unit)));
+        tmp.put("sys_cpu", new MetricExtractor((m, unit) -> m.getSysTime(unit)));
+        tmp.put("gc_cpu", new MetricExtractor((m, unit) -> m.getGc(unit)));
+        tmp.put("cores", new MetricExtractor(
+            (m, unit) -> (m.getSysTime(TimeUnit.SECONDS) + m.getUserTime(TimeUnit.SECONDS)) / m.getTimeWindow(),
+            ""));
+        tmp.put("uptime",  new MetricExtractor((m, unit) -> m.getUptimeSecs(), "s"));
+        tmp.put("acked",  new MetricExtractor((m, unit) -> m.getAcked(), ""));
+        tmp.put("rate",  new MetricExtractor((m, unit) -> m.getAckedPerSec(), "tuple/s"));
+        tmp.put("completed",  new MetricExtractor((m, unit) -> m.getCompleted(), ""));
+        tmp.put("completion_rate",  new MetricExtractor((m, unit) -> m.getCompletedPerSec(), "tuple/s"));
+        tmp.put("mem",  new MetricExtractor((m, unit) -> m.getMemMb(), "MB"));
+        tmp.put("failed",  new MetricExtractor((m, unit) -> m.getFailed(), ""));
+        tmp.put("start_time",  new MetricExtractor((m, unit) -> m.startTime(),"s"));
+        tmp.put("end_time",  new MetricExtractor((m, unit) -> m.endTime(), "s"));
+        tmp.put("time_window",  new MetricExtractor((m, unit) -> m.getTimeWindow(), "s"));
+        tmp.put("ids",  new MetricExtractor((m, unit) -> m.getTopologyIds(), ""));
+        NAMED_EXTRACTORS = Collections.unmodifiableMap(tmp);
+    }
+
+    static class MetricExtractor {
+        private final String unit;
+        private final BiFunction<Measurements, TimeUnit, Object> func;
+
+        public MetricExtractor(BiFunction<Measurements, TimeUnit, Object> func) {
+            this.func = func;
+            this.unit = null;
+        }
+
+        public MetricExtractor(BiFunction<Measurements, TimeUnit, Object> func, String unit) {
+            this.func = func;
+            this.unit = unit;
+        }
+
+        public Object get(Measurements m, TimeUnit unit) {
+            return func.apply(m, unit);
+        }
+
+        public String formatName(String name, TimeUnit targetUnit) {
+            StringBuilder ret = new StringBuilder();
+            ret.append(name);
+            if (unit == null || !unit.isEmpty()) {
+                ret.append("(");
+                if (unit == null) {
+                    ret.append(TIME_UNIT_NAME.get(targetUnit));
+                } else {
+                    ret.append(unit);
+                }
+                ret.append(")");
+            }
+            return ret.toString();
+        }
+    }
+
+    static class SepValReporter extends  FileReporter {
+        private final TimeUnit targetUnit;
+        private final String separator;
+        private final List<String> extractors;
+        private final String meta;
+
+        public SepValReporter(String separator, String path, Map<String, String> query) throws FileNotFoundException {
+            super(path, query);
+            this.separator = separator;
+            targetUnit = UNIT_MAP.get(query.getOrDefault("time", "MILLISECONDS").toUpperCase());
+            if (targetUnit == null) {
+                throw new IllegalArgumentException(query.get("time") + " is not a supported time unit");
+            }
+            if (query.containsKey("columns")) {
+                extractors = Arrays.asList(query.get("columns").split("\\s*,\\s*"));
+                HashSet<String> notFound = new HashSet<>(extractors);
+                notFound.removeAll(NAMED_EXTRACTORS.keySet());
+                if (notFound.size() > 0) {
+                    throw new IllegalArgumentException(notFound + " columns are not supported");
+                }
+            } else {
+                //Wrapping it makes it mutable
+                extractors = new ArrayList<>(Arrays.asList("start_time", "end_time", "completion_rate",
+                    "mean", "99%ile", "99.9%ile", "cores", "mem", "failed"));
+            }
+
+            if (query.containsKey("extraColumns")) {
+                List<String> moreExtractors = Arrays.asList(query.get("extraColumns").split("\\s*,\\s*"));
+                for (String extractor: moreExtractors) {
+                    if (!NAMED_EXTRACTORS.containsKey(extractor)) {
+                        throw new IllegalArgumentException(extractor + " is not a supported column");
+                    }
+                    if (!extractors.contains(extractor)) {
+                        extractors.add(extractor);
+                    }
+                }
+            }
+
+            meta = query.get("meta");
+        }
+
+        @Override
+        public void start() {
+            boolean first = true;
+            for (String name: extractors) {
+                if (!first) {
+                    out.print(separator);
+                }
+                first = false;
+                out.print(NAMED_EXTRACTORS.get(name).formatName(name, targetUnit));
+            }
+            if (meta != null) {
+                out.print(separator);
+                out.print("meta");
+            }
+            out.println();
+        }
+
+        @Override
+        public void reportWindow(Measurements m, List<Measurements> allTime) {
+            boolean first = true;
+            for (String name: extractors) {
+                if (!first) {
+                    out.print(separator);
+                }
+                first = false;
+                Object value = NAMED_EXTRACTORS.get(name).get(m, targetUnit);
+                String svalue = value == null ? "" : value.toString();
+                out.print(escape(svalue));
+            }
+            if (meta != null) {
+                out.print(separator);
+                out.print(escape(meta));
+            }
+            out.println();
+        }
+
+        private String escape(String svalue) {
+            return svalue.replace("\\", "\\\\").replace(separator, "\\" + separator);
+        }
+    }
+
+    static class LegacyReporter extends FileReporter {
+        private final TimeUnit targetUnitOverride;
+
+        public LegacyReporter() throws FileNotFoundException {
+            super();
+            targetUnitOverride = null;
+        }
+
+        public LegacyReporter(String path, Map<String, String> query) throws FileNotFoundException {
+            super(path, query);
+            if (query.containsKey("time")) {
+                targetUnitOverride = UNIT_MAP.get(query.get("time").toUpperCase());
+                if (targetUnitOverride == null) {
+                    throw new IllegalArgumentException(query.get("time") + " is not a supported time unit");
+                }
+            } else {
+                targetUnitOverride = null;
+            }
+        }
+
+        @Override
+        public void reportWindow(Measurements m, List<Measurements> allTime) {
+            TimeUnit nsOr = TimeUnit.NANOSECONDS;
+            TimeUnit msOr = TimeUnit.MILLISECONDS;
+            if (targetUnitOverride != null) {
+                nsOr = targetUnitOverride;
+                msOr = targetUnitOverride;
+            }
+
+            Measurements total = Measurements.combine(allTime, null, null);
+            out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d "
+                    + "99%%: %,15.0f 99.9%%: %,15.0f min: %,15.0f max: %,15.0f mean: %,15.2f "
+                    + "stddev: %,15.2f user: %,10.0f sys: %,10.0f gc: %,10.0f mem: %,10.2f\n",
+                m.getUptimeSecs(), m.getAcked(), m.getAckedPerSec(), total.getFailed(),
+                m.getLatencyAtPercentile(99.0, nsOr),
+                m.getLatencyAtPercentile(99.9, nsOr),
+                m.getMinLatency(nsOr),
+                m.getMaxLatency(nsOr),
+                m.getMeanLatency(nsOr),
+                m.getLatencyStdDeviation(nsOr),
+                m.getUserTime(msOr),
+                m.getSysTime(msOr),
+                m.getGc(msOr),
+                m.getMemMb());
+        }
+    }
+
+    /**
+     * Add Command line options for configuring the output of this.
+     * @param options command line options to update
+     */
+    public static void addCommandLineOptions(Options options) {
+        //We want to be able to select the measurement interval
+        // reporting window (We don't need 3 different reports)
+        // We want to be able to specify format (and configs specific to the format)
+        // With perhaps defaults overall
+        options.addOption(Option.builder("r")
+            .longOpt("report-interval")
+            .hasArg()
+            .argName("INTERVAL_SECS")
+            .desc("How long in between reported metrics.  Will be rounded up to the next 10 sec boundary.\n"
+                + "default " + DEFAULT_REPORT_INTERVAL)
+            .build());
+
+        options.addOption(Option.builder("w")
+            .longOpt("report-window")
+            .hasArg()
+            .argName("INTERVAL_SECS")
+            .desc("How long of a rolling window should be in each report.  Will be rounded up to the next report interval boundary.\n"
+                + "default " + DEFAULT_WINDOW_INTERVAL)
+            .build());
+
+        options.addOption(Option.builder()
+            .longOpt("reporter")
+            .hasArg()
+            .argName("TYPE:PATH?OPTIONS")
+            .desc("Provide the config for a reporter to run.  Supported types are:\n"
+                + "LEGACY - (write things out in the legacy format)\n"
+                + "TSV - tab separated values\n"
+                + "CSV - comma separated values")
+            .build());
+
+    }
+
+    public static final long DEFAULT_REPORT_INTERVAL = 30;
+    public static final long DEFAULT_WINDOW_INTERVAL = DEFAULT_REPORT_INTERVAL;
+    private static final Pattern REPORTER_PATTERN = Pattern.compile(
+        "(?<type>[^:?]+)(?::(?<path>[^?]+))?(?:\\?(?<query>.*))?");
+
+    private final Histogram histo = new Histogram(3600000000000L, 3);
+    private final AtomicLong systemCpu = new AtomicLong(0);
+    private final AtomicLong userCpu = new AtomicLong(0);
+    private final AtomicLong gcCount = new AtomicLong(0);
+    private final AtomicLong gcMs = new AtomicLong(0);
+    private final ConcurrentHashMap<String, MemMeasure> memoryBytes = new ConcurrentHashMap<>();
+    private final List<MetricResultsReporter> reporters;
+    private long prevAcked = 0;
+    private long prevFailed = 0;
+    private long prevUptime = 0;
+    private int windowLength = 1;
+    private long reportIntervalSecs = DEFAULT_REPORT_INTERVAL;
+
+    private final LinkedList<Measurements> allCombined = new LinkedList<>();
+
+    LoadMetricsServer(Map<String, Object> conf, CommandLine commandLine) throws URISyntaxException, FileNotFoundException {
+        super(conf);
+        if (commandLine.hasOption("r")) {
+            reportIntervalSecs = Long.parseLong(commandLine.getOptionValue("r"));
+            reportIntervalSecs = ((reportIntervalSecs + 1) / 10) * 10;
+        }
+        if (commandLine.hasOption("w")) {
+            long window = Long.parseLong(commandLine.getOptionValue("w"));
+            windowLength = (int) ((window + 1) / reportIntervalSecs);
+        }
+        reporters = new ArrayList<>();
+        if (commandLine.hasOption("reporter")) {
+            for (String reporterString: commandLine.getOptionValues("reporter")) {
+                Matcher m = REPORTER_PATTERN.matcher(reporterString);
+                if (!m.matches()) {
+                    throw new IllegalArgumentException(reporterString + " does nto look like it is a reporter");
+                }
+                String type = m.group("type");
+                String path = m.group("path");
+                Map<String, String> query = new HashMap<>();
+                String queryString = m.group("query");
+                if (queryString != null) {
+                    for (String param : queryString.split("&")) {
+                        String[] pair = param.split("=");
+                        String key = pair[0];
+                        String value = pair.length > 1 ? pair[1] : "true";
+                        query.put(key, value);
+                    }
+                }
+                type = type.toUpperCase();
+                switch (type) {
+                    case "LEGACY":
+                        reporters.add(new LegacyReporter(path, query));
+                        break;
+                    case "TSV":
+                        reporters.add(new SepValReporter("\t", path, query));
+                        break;
+                    case "CSV":
+                        reporters.add(new SepValReporter(",", path, query));
+                        break;
+                    default:
+                        throw new RuntimeException(type + " is not a supported reporter type");
+                }
+            }
+        } else {
+            reporters.add(new LegacyReporter());
+        }
+    }
+
+    private long readMemory() {
+        long total = 0;
+        for (MemMeasure mem: memoryBytes.values()) {
+            total += mem.get();
+        }
+        return total;
+    }
+
+    private void startMetricsOutput() {
+        for (MetricResultsReporter reporter: reporters) {
+            reporter.start();
+        }
+    }
+
+    private void finishMetricsOutput() throws Exception {
+        for (MetricResultsReporter reporter: reporters) {
+            reporter.finish(allCombined);
+        }
+    }
+
+    /**
+     * Monitor the list of topologies for the given time frame.
+     * @param execTimeMins how long to monitor for
+     * @param client the client to use when monitoring
+     * @param topoNames the names of the topologies to monitor
+     * @throws Exception on any error
+     */
+    public void monitorFor(double execTimeMins, Nimbus.Iface client, Collection<String> topoNames) throws Exception {
+        startMetricsOutput();
+        long iterations = (long) ((execTimeMins * 60) / reportIntervalSecs);
+        for (int i = 0; i < iterations; i++) {
+            Thread.sleep(reportIntervalSecs * 1000);
+            outputMetrics(client, topoNames);
+        }
+        finishMetricsOutput();
+    }
+
+    private void outputMetrics(Nimbus.Iface client, Collection<String> names) throws Exception {
+        ClusterSummary summary = client.getClusterInfo();
+        Set<String> ids = new HashSet<>();
+        for (TopologySummary ts: summary.get_topologies()) {
+            if (names.contains(ts.get_name())) {
+                ids.add(ts.get_id());
+            }
+        }
+        if (ids.size() != names.size()) {
+            throw new Exception("Could not find all topologies: " + names);
+        }
+        int uptime = 0;
+        long acked = 0;
+        long failed = 0;
+        for (String id: ids) {
+            TopologyInfo info = client.getTopologyInfo(id);
+            uptime = Math.max(uptime, info.get_uptime_secs());
+            for (ExecutorSummary exec : info.get_executors()) {
+                if (exec.get_stats() != null && exec.get_stats().get_specific() != null
+                    && exec.get_stats().get_specific().is_set_spout()) {
+                    SpoutStats stats = exec.get_stats().get_specific().get_spout();
+                    Map<String, Long> failedMap = stats.get_failed().get(":all-time");
+                    Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
+                    if (ackedMap != null) {
+                        for (String key : ackedMap.keySet()) {
+                            if (failedMap != null) {
+                                Long tmp = failedMap.get(key);
+                                if (tmp != null) {
+                                    failed += tmp;
+                                }
+                            }
+                            long ackVal = ackedMap.get(key);
+                            acked += ackVal;
+                        }
+                    }
+                }
+            }
+        }
+        @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+        long failedThisTime = failed - prevFailed;
+        @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+        long ackedThisTime = acked - prevAcked;
+        @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+        long thisTime = uptime - prevUptime;
+        prevUptime = uptime;
+        prevAcked = acked;
+        prevFailed = failed;
+
+        Histogram copy = new Histogram(3600000000000L, 3);;
+        synchronized (histo) {
+            copy.add(histo);
+            histo.reset();
+        }
+        long user = userCpu.getAndSet(0);
+        long sys = systemCpu.getAndSet(0);
+        long gc = gcMs.getAndSet(0);
+        long memBytes = readMemory();
+
+        allCombined.add(new Measurements(uptime, ackedThisTime, thisTime, failedThisTime, copy, user, sys, gc, memBytes, ids));
+        Measurements inWindow = Measurements.combine(allCombined, null, windowLength);
+        for (MetricResultsReporter reporter: reporters) {
+            reporter.reportWindow(inWindow, allCombined);
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void handle(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {
+        //crud no simple way to tie this to a given topology :(
+        String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
+        for (IMetricsConsumer.DataPoint dp: dataPoints) {
+            if (dp.name.startsWith("comp-lat-histo") && dp.value instanceof Histogram) {
+                synchronized (histo) {
+                    histo.add((Histogram)dp.value);
+                }
+            } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
+                Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                Object sys = m.get("sys-ms");
+                if (sys instanceof Number) {
+                    systemCpu.getAndAdd(((Number)sys).longValue());
+                }
+                Object user = m.get("user-ms");
+                if (user instanceof Number) {
+                    userCpu.getAndAdd(((Number)user).longValue());
+                }
+            } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
+                Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                Object count = m.get("count");
+                if (count instanceof Number) {
+                    gcCount.getAndAdd(((Number)count).longValue());
+                }
+                Object time = m.get("timeMs");
+                if (time instanceof Number) {
+                    gcMs.getAndAdd(((Number)time).longValue());
+                }
+            } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
+                Map<Object, Object> m = (Map<Object, Object>)dp.value;
+                Object val = m.get("usedBytes");
+                if (val instanceof Number) {
+                    MemMeasure mm = memoryBytes.get(worker);
+                    if (mm == null) {
+                        mm = new MemMeasure();
+                        MemMeasure tmp = memoryBytes.putIfAbsent(worker, mm);
+                        mm = tmp == null ? mm : tmp;
+                    }
+                    mm.update(((Number)val).longValue());
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java
new file mode 100644
index 0000000..95ade59
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+/**
+ * A spout that simulates a real world spout based off of statistics about it.
+ */
+public class LoadSpout  extends BaseRichSpout {
+    private static class OutputStreamEngineWithHisto extends OutputStreamEngine {
+        public final HistogramMetric histogram;
+
+        public OutputStreamEngineWithHisto(OutputStream stats, TopologyContext context) {
+            super(stats);
+            histogram = new HistogramMetric(3600000000000L, 3);
+            //TODO perhaps we can adjust the frequency later...
+            context.registerMetric("comp-lat-histo-" + stats.id, histogram, 10);
+        }
+    }
+
+    private static class SentWithTime {
+        public final String streamName;
+        public final Values keyValue;
+        public final long time;
+        public final HistogramMetric histogram;
+
+        SentWithTime(String streamName, Values keyValue, long time, HistogramMetric histogram) {
+            this.streamName = streamName;
+            this.keyValue = keyValue;
+            this.time = time;
+            this.histogram = histogram;
+        }
+
+        public void done() {
+            histogram.recordValue(Math.max(0, System.nanoTime() - time));
+        }
+    }
+
+    private final List<OutputStream> streamStats;
+    private List<OutputStreamEngineWithHisto> streams;
+    private SpoutOutputCollector collector;
+    //This is an attempt to give all of the streams an equal opportunity to emit something.
+    private long nextStreamCounter = 0;
+    private final int numStreams;
+
+    /**
+     * Create a simple load spout with just a set rate per second on the default stream.
+     * @param ratePerSecond the rate to send messages at.
+     */
+    public LoadSpout(double ratePerSecond) {
+        OutputStream test = new OutputStream.Builder()
+            .withId("default")
+            .withRate(new NormalDistStats(ratePerSecond, 0.0, ratePerSecond, ratePerSecond))
+            .build();
+        streamStats = Arrays.asList(test);
+        numStreams = 1;
+    }
+
+    public LoadSpout(LoadCompConf conf) {
+        this.streamStats = Collections.unmodifiableList(new ArrayList<>(conf.streams));
+        numStreams = streamStats.size();
+    }
+
+    @Override
+    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+        streams = Collections.unmodifiableList(streamStats.stream()
+            .map((ss) -> new OutputStreamEngineWithHisto(ss, context)).collect(Collectors.toList()));
+        this.collector = collector;
+    }
+
+    @Override
+    public void nextTuple() {
+        int size = numStreams;
+        for (int tries = 0; tries < size; tries++) {
+            int index = Math.abs((int) (nextStreamCounter++ % size));
+            OutputStreamEngineWithHisto se = streams.get(index);
+            Long emitTupleTime = se.shouldEmit();
+            if (emitTupleTime != null) {
+                SentWithTime swt =
+                    new SentWithTime(se.streamName, getNextValues(se), emitTupleTime, se.histogram);
+                collector.emit(swt.streamName, swt.keyValue, swt);
+                break;
+            }
+        }
+    }
+
+    protected Values getNextValues(OutputStreamEngine se) {
+        return new Values(se.nextKey(), "JUST_SOME_VALUE");
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for (OutputStream s: streamStats) {
+            declarer.declareStream(s.id, new Fields("key", "value"));
+        }
+    }
+
+    @Override
+    public void ack(Object id) {
+        ((SentWithTime)id).done();
+    }
+
+    @Override
+    public void fail(Object id) {
+        SentWithTime swt = (SentWithTime)id;
+        collector.emit(swt.streamName, swt.keyValue, swt);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
new file mode 100644
index 0000000..20b2926
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.storm.utils.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stats related to something with a normal distribution, and a way to randomly simulate it.
+ */
+public class NormalDistStats implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(NormalDistStats.class);
+    public final double mean;
+    public final double stddev;
+    public final double min;
+    public final double max;
+
+    /**
+     * Read the stats from a config.
+     * @param conf the config.
+     * @return the corresponding stats.
+     */
+    public static NormalDistStats fromConf(Map<String, Object> conf) {
+        return fromConf(conf, null);
+    }
+
+    /**
+     * Read the stats from a config.
+     * @param conf the config.
+     * @param def the default mean.
+     * @return the corresponding stats.
+     */
+    public static NormalDistStats fromConf(Map<String, Object> conf, Double def) {
+        if (conf == null) {
+            conf = Collections.emptyMap();
+        }
+        double mean = ObjectReader.getDouble(conf.get("mean"), def);
+        double stddev = ObjectReader.getDouble(conf.get("stddev"), mean / 4);
+        double min = ObjectReader.getDouble(conf.get("min"), 0.0);
+        double max = ObjectReader.getDouble(conf.get("max"), Double.MAX_VALUE);
+        return new NormalDistStats(mean, stddev, min, max);
+    }
+
+    /**
+     * Return this as a config.
+     * @return the config version of this.
+     */
+    public Map<String, Object> toConf() {
+        Map<String, Object> ret = new HashMap<>();
+        ret.put("mean", mean);
+        ret.put("stddev", stddev);
+        ret.put("min", min);
+        ret.put("max", max);
+        return ret;
+    }
+
+    /**
+     * Create an instance of this from a list of values.  The metrics will be computed from the values.
+     * @param values the values to compute metrics from.
+     */
+    public NormalDistStats(List<Double> values) {
+        //Compute the stats for these and save them
+        double min = values.isEmpty() ? 0.0 : values.get(0);
+        double max = values.isEmpty() ? 0.0 : values.get(0);
+        double sum = 0.0;
+        long count = values.size();
+        for (Double v: values) {
+            sum += v;
+            min = Math.min(min, v);
+            max = Math.max(max,v);
+        }
+        double mean = sum / Math.max(count, 1);
+        double sdPartial = 0;
+        for (Double v: values) {
+            sdPartial += Math.pow(v - mean, 2);
+        }
+        double stddev = 0.0;
+        if (count >= 2) {
+            stddev = Math.sqrt(sdPartial / (count - 1));
+        }
+        this.min = min;
+        this.max = max;
+        this.mean = mean;
+        this.stddev = stddev;
+        LOG.debug("Stats for {} are {}", values, this);
+    }
+
+    /**
+     * A Constructor for the pre computed stats.
+     * @param mean the mean of the values.
+     * @param stddev the standard deviation of the values.
+     * @param min the min of the values.
+     * @param max the max of the values.
+     */
+    public NormalDistStats(double mean, double stddev, double min, double max) {
+        this.mean = mean;
+        this.stddev = stddev;
+        this.min = min;
+        this.max = max;
+    }
+
+    /**
+     * Generate a random number that follows the statistical distribution
+     * @param rand the random number generator to use
+     * @return the next number that should follow the statistical distribution.
+     */
+    public double nextRandom(Random rand) {
+        return Math.max(Math.min((rand.nextGaussian() * stddev) + mean, max), min);
+    }
+
+    @Override
+    public String toString() {
+        return "mean: " + mean + " min: " + min + " max: " + max + " stddev: " + stddev;
+    }
+
+    /**
+     * Scale the stats by v. This is not scaling everything proportionally.  We don't want the stddev to increase
+     * so instead we scale the mean and shift everything up or down by the same amount.
+     * @param v the amount to scale by 1.0 is nothing 0.5 is half.
+     * @return a copy of this with the needed adjustments.
+     */
+    public NormalDistStats scaleBy(double v) {
+        double newMean = mean * v;
+        double shiftAmount = newMean - mean;
+        return new NormalDistStats(Math.max(0, mean + shiftAmount), stddev,
+            Math.max(0, min + shiftAmount), Math.max(0, max + shiftAmount));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
new file mode 100644
index 0000000..99357d9
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import jdk.nashorn.internal.objects.Global;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.loadgen.NormalDistStats;
+
+/**
+ * A set of measurements about a stream so we can statistically reproduce it.
+ */
+public class OutputStream implements Serializable {
+    //The global stream id is this + the from component it must be a part of.
+    public final String id;
+    public final NormalDistStats rate;
+    public final boolean areKeysSkewed;
+
+    /**
+     * Create an output stream from a config.
+     * @param conf the config to read from.
+     * @return the read OutputStream.
+     */
+    public static OutputStream fromConf(Map<String, Object> conf) {
+        String streamId = (String) conf.getOrDefault("streamId", "default");
+        NormalDistStats rate = NormalDistStats.fromConf((Map<String, Object>) conf.get("rate"));
+        boolean areKeysSkewed = (Boolean) conf.getOrDefault("areKeysSkewed", false);
+        return new OutputStream(streamId, rate, areKeysSkewed);
+    }
+
+    /**
+     * Convert this to a conf.
+     * @return the conf.
+     */
+    public Map<String, Object> toConf() {
+        Map<String, Object> ret = new HashMap<>();
+        ret.put("streamId", id);
+        ret.put("rate", rate.toConf());
+        ret.put("areKeysSkewed", areKeysSkewed);
+        return ret;
+    }
+
+    public OutputStream remap(String origId, Map<GlobalStreamId, GlobalStreamId> remappedStreams) {
+        GlobalStreamId remapped = remappedStreams.get(new GlobalStreamId(origId, id));
+        return new OutputStream(remapped.get_streamId(), rate, areKeysSkewed);
+    }
+
+    public OutputStream scaleThroughput(double v) {
+        return new OutputStream(id, rate.scaleBy(v), areKeysSkewed);
+    }
+
+    public static class Builder {
+        private String id;
+        private NormalDistStats rate;
+        private boolean areKeysSkewed;
+
+        public String getId() {
+            return id;
+        }
+
+        public Builder withId(String id) {
+            this.id = id;
+            return this;
+        }
+
+        public NormalDistStats getRate() {
+            return rate;
+        }
+
+        public Builder withRate(NormalDistStats rate) {
+            this.rate = rate;
+            return this;
+        }
+
+        public boolean isAreKeysSkewed() {
+            return areKeysSkewed;
+        }
+
+        public Builder withAreKeysSkewed(boolean areKeysSkewed) {
+            this.areKeysSkewed = areKeysSkewed;
+            return this;
+        }
+
+        public OutputStream build() {
+            return new OutputStream(id, rate, areKeysSkewed);
+        }
+    }
+
+    /**
+     * Create a new stream with stats
+     * @param id the id of the stream
+     * @param rate the rate of tuples being emitted on this stream
+     * @param areKeysSkewed true if keys are skewed else false.  For skewed keys
+     *     we only simulate it by using a gaussian distribution to the keys instead
+     *     of an even distribution.  Tere is no effort made right not to measure the
+     *     skewness and reproduce it.
+     */
+    public OutputStream(String id, NormalDistStats rate, boolean areKeysSkewed) {
+        this.id = id;
+        this.rate = rate;
+        this.areKeysSkewed = areKeysSkewed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStreamEngine.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStreamEngine.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStreamEngine.java
new file mode 100644
index 0000000..80111c8
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStreamEngine.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Provides an API to simulate the output of a stream.
+ * <p>
+ * Right now it is just rate, but in the future we expect to do data skew as well...
+ * </p>
+ */
+public class OutputStreamEngine {
+    private static final double NANO_PER_SEC = 1_000_000_000.0;
+    private static final long UPDATE_RATE_PERIOD_NS = ((long)NANO_PER_SEC * 2);
+    private static final String[] KEYS = new String[2048];
+
+    static {
+        //We get a new random number and seed it to make sure that runs are consistent where possible.
+        Random r = new Random(KEYS.length);
+        for (int i = 0; i < KEYS.length; i++) {
+            KEYS[i] = String.valueOf(r.nextDouble());
+        }
+    }
+
+    private long periodNano;
+    private long emitAmount;
+    private final Random rand;
+    private long nextEmitTime;
+    private long nextRateRandomizeTime;
+    private long emitsLeft;
+    private final OutputStream stats;
+    public final String streamName;
+
+    /**
+     * Create an engine that can simulate the given stats.
+     * @param stats the stats to follow
+     */
+    public OutputStreamEngine(OutputStream stats) {
+        this.stats = stats;
+        rand = ThreadLocalRandom.current();
+        selectNewRate();
+        //Start emitting right now
+        nextEmitTime = System.nanoTime();
+        nextRateRandomizeTime = nextEmitTime + UPDATE_RATE_PERIOD_NS;
+        emitsLeft = emitAmount;
+        streamName = stats.id;
+    }
+
+    private void selectNewRate() {
+        double ratePerSecond = stats.rate.nextRandom(rand);
+        if (ratePerSecond > 0) {
+            periodNano = Math.max(1, (long)(NANO_PER_SEC / ratePerSecond));
+            emitAmount = Math.max(1, (long)((ratePerSecond / NANO_PER_SEC) * periodNano));
+        } else {
+            //if it is is 0 or less it really is 1 per 10 seconds.
+            periodNano = (long)NANO_PER_SEC * 10;
+            emitAmount = 1;
+        }
+    }
+
+    /**
+     * Should we emit or not.
+     * @return the start time of the message, or null of nothing should be emitted.
+     */
+    public Long shouldEmit() {
+        long time = System.nanoTime();
+        if (emitsLeft <= 0 && nextEmitTime <= time) {
+            emitsLeft = emitAmount;
+            nextEmitTime = nextEmitTime + periodNano;
+        }
+
+        if (nextRateRandomizeTime <= time) {
+            //Once every UPDATE_RATE_PERIOD_NS
+            selectNewRate();
+            nextRateRandomizeTime = nextEmitTime + UPDATE_RATE_PERIOD_NS;
+        }
+
+        if (emitsLeft > 0) {
+            emitsLeft--;
+            return nextEmitTime - periodNano;
+        }
+        return null;
+    }
+
+    /**
+     * Get the next key to emit.
+     * @return the key that should be emitted.
+     */
+    public String nextKey() {
+        int keyIndex;
+        if (stats.areKeysSkewed) {
+            //We set the stddev of the skewed keys to be 1/5 of the length, but then we use the absolute value
+            // of that so everything is skewed towards 0
+            keyIndex = Math.min(KEYS.length - 1 , Math.abs((int)(rand.nextGaussian() * KEYS.length / 5)));
+        } else {
+            keyIndex = rand.nextInt(KEYS.length);
+        }
+        return KEYS[keyIndex];
+    }
+
+    public int nextInt(int bound) {
+        return rand.nextInt(bound);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ScopedTopologySet.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ScopedTopologySet.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ScopedTopologySet.java
new file mode 100644
index 0000000..1b6ed74
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ScopedTopologySet.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.Nimbus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of topology names that will be killed when this is closed.
+ */
+public class ScopedTopologySet extends HashSet<String> implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(ScopedTopologySet.class);
+    private static final KillOptions NO_WAIT_KILL = new KillOptions();
+
+    static {
+        NO_WAIT_KILL.set_wait_secs(0);
+    }
+
+    private final Nimbus.Iface client;
+    private final Set<String> unmodWrapper;
+
+    public ScopedTopologySet(Nimbus.Iface client) {
+        this.client = client;
+        unmodWrapper = Collections.unmodifiableSet(this);
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        throw new RuntimeException("Unmodifiable Set");
+    }
+
+    @Override
+    public void clear() {
+        throw new RuntimeException("Unmodifiable Set");
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        throw new RuntimeException("Unmodifiable Set");
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c) {
+        throw new RuntimeException("Unmodifiable Set");
+    }
+
+    @Override
+    public void close() {
+        RuntimeException saved = null;
+        for (Iterator<String> it = super.iterator(); it.hasNext();) {
+            String name = it.next();
+            try {
+                client.killTopologyWithOpts(name, NO_WAIT_KILL);
+            } catch (Exception e) {
+                RuntimeException wrapped = new RuntimeException("Error trying to kill " + name, e);
+                if (saved != null) {
+                    saved.addSuppressed(wrapped);
+                } else {
+                    saved = wrapped;
+                }
+            }
+        }
+        super.clear();
+        if (saved != null) {
+            throw saved;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/6c2dcbed/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
index 96c13c5..2c22b42 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
@@ -15,114 +15,67 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.starter;
 
-import java.util.Collection;
+package org.apache.storm.loadgen;
+
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.HdrHistogram.Histogram;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.ClusterSummary;
-import org.apache.storm.generated.ExecutorSummary;
-import org.apache.storm.generated.KillOptions;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.generated.SpoutStats;
-import org.apache.storm.generated.TopologyInfo;
-import org.apache.storm.generated.TopologySummary;
-import org.apache.storm.metric.api.IMetricsConsumer.DataPoint;
-import org.apache.storm.metric.api.IMetricsConsumer.TaskInfo;
-import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
-import org.apache.storm.misc.metric.HttpForwardingMetricsServer;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
+import org.apache.storm.metric.LoggingMetricsConsumer;
 import org.apache.storm.topology.BasicOutputCollector;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.topology.base.BaseBasicBolt;
-import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.NimbusClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * WordCount but the spout goes at a predefined rate and we collect
  * proper latency statistics.
  */
 public class ThroughputVsLatency {
-    private static class SentWithTime {
-        public final String sentence;
-        public final long time;
-
-        SentWithTime(String sentence, long time) {
-            this.sentence = sentence;
-            this.time = time;
-        }
-    }
-
-    public static class FastRandomSentenceSpout extends BaseRichSpout {
-        static final String[] SENTENCES = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
-                "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
-
-        SpoutOutputCollector _collector;
-        long _periodNano;
-        long _emitAmount;
-        Random _rand;
-        long _nextEmitTime;
-        long _emitsLeft;
-        HistogramMetric _histo;
+    private static final Logger LOG = LoggerFactory.getLogger(GenLoad.class);
+    private static final int TEST_EXECUTE_TIME_DEFAULT = 5;
+    private static final long DEFAULT_RATE_PER_SECOND = 500;
+    private static final String DEFAULT_TOPO_NAME = "wc-test";
+    private static final int DEFAULT_NUM_SPOUTS = 1;
+    private static final int DEFAULT_NUM_SPLITS = 1;
+    private static final int DEFAULT_NUM_COUNTS = 1;
+
+    public static class FastRandomSentenceSpout extends LoadSpout {
+        static final String[] SENTENCES = new String[] {
+            "the cow jumped over the moon",
+            "an apple a day keeps the doctor away",
+            "four score and seven years ago",
+            "snow white and the seven dwarfs",
+            "i am at two with nature"
+        };
 
+        /**
+         * Constructor.
+         * @param ratePerSecond the rate to emite tuples at.
+         */
         public FastRandomSentenceSpout(long ratePerSecond) {
-            if (ratePerSecond > 0) {
-                _periodNano = Math.max(1, 1000000000/ratePerSecond);
-                _emitAmount = Math.max(1, (long)((ratePerSecond / 1000000000.0) * _periodNano));
-            } else {
-                _periodNano = Long.MAX_VALUE - 1;
-                _emitAmount = 1;
-            }
+            super(ratePerSecond);
         }
 
         @Override
-        public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
-            _collector = collector;
-            _rand = ThreadLocalRandom.current();
-            _nextEmitTime = System.nanoTime();
-            _emitsLeft = _emitAmount;
-            _histo = new HistogramMetric(3600000000000L, 3);
-            context.registerMetric("comp-lat-histo", _histo, 10); //Update every 10 seconds, so we are not too far behind
-        }
-
-        @Override
-        public void nextTuple() {
-            if (_emitsLeft <= 0 && _nextEmitTime <= System.nanoTime()) {
-                _emitsLeft = _emitAmount;
-                _nextEmitTime = _nextEmitTime + _periodNano;
-            }
-
-            if (_emitsLeft > 0) {
-                String sentence = SENTENCES[_rand.nextInt(SENTENCES.length)];
-                _collector.emit(new Values(sentence), new SentWithTime(sentence, _nextEmitTime - _periodNano));
-                _emitsLeft--;
-            }
-        }
-
-        @Override
-        public void ack(Object id) {
-            long end = System.nanoTime();
-            SentWithTime st = (SentWithTime)id;
-            _histo.recordValue(end-st.time);
-        }
-
-        @Override
-        public void fail(Object id) {
-            SentWithTime st = (SentWithTime)id;
-            _collector.emit(new Values(st.sentence), id);
+        protected Values getNextValues(OutputStreamEngine se) {
+            String sentence = SENTENCES[se.nextInt(SENTENCES.length)];
+            return new Values(sentence);
         }
 
         @Override
@@ -147,14 +100,15 @@ public class ThroughputVsLatency {
     }
 
     public static class WordCount extends BaseBasicBolt {
-        Map<String, Integer> counts = new HashMap<String, Integer>();
+        Map<String, Integer> counts = new HashMap<>();
 
         @Override
         public void execute(Tuple tuple, BasicOutputCollector collector) {
             String word = tuple.getString(0);
             Integer count = counts.get(word);
-            if (count == null)
+            if (count == null) {
                 count = 0;
+            }
             count++;
             counts.put(word, count);
             collector.emit(new Values(word, count));
@@ -166,184 +120,103 @@ public class ThroughputVsLatency {
         }
     }
 
-    private static class MemMeasure {
-        private long _mem = 0;
-        private long _time = 0;
-
-        public synchronized void update(long mem) {
-            _mem = mem;
-            _time = System.currentTimeMillis();
-        }
-
-        public synchronized long get() {
-            return isExpired() ? 0l : _mem;
-        }
-
-        public synchronized boolean isExpired() {
-            return (System.currentTimeMillis() - _time) >= 20000;
-        }
-    }
-
-    private static final Histogram _histo = new Histogram(3600000000000L, 3);
-    private static final AtomicLong _systemCPU = new AtomicLong(0);
-    private static final AtomicLong _userCPU = new AtomicLong(0);
-    private static final AtomicLong _gcCount = new AtomicLong(0);
-    private static final AtomicLong _gcMs = new AtomicLong(0);
-    private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<String, MemMeasure>();
-
-    private static long readMemory() {
-        long total = 0;
-        for (MemMeasure mem: _memoryBytes.values()) {
-            total += mem.get();
-        }
-        return total;
-    }
-
-    private static long _prev_acked = 0;
-    private static long _prev_uptime = 0;
-
-    public static void printMetrics(Nimbus.Iface client, String name) throws Exception {
-        ClusterSummary summary = client.getClusterInfo();
-        String id = null;
-        for (TopologySummary ts: summary.get_topologies()) {
-            if (name.equals(ts.get_name())) {
-                id = ts.get_id();
+    /**
+     * The main entry point for ThroughputVsLatency.
+     * @param args the command line args
+     * @throws Exception on any error.
+     */
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        options.addOption(Option.builder("h")
+            .longOpt("help")
+            .desc("Print a help message")
+            .build());
+        options.addOption(Option.builder("t")
+            .longOpt("test-time")
+            .argName("MINS")
+            .hasArg()
+            .desc("How long to run the tests for in mins (defaults to " + TEST_EXECUTE_TIME_DEFAULT + ")")
+            .build());
+        options.addOption(Option.builder()
+            .longOpt("rate")
+            .argName("SENTENCES/SEC")
+            .hasArg()
+            .desc("How many sentences per second to run. (defaults to " + DEFAULT_RATE_PER_SECOND + ")")
+            .build());
+        options.addOption(Option.builder()
+            .longOpt("name")
+            .argName("TOPO_NAME")
+            .hasArg()
+            .desc("Name of the topology to run (defaults to " + DEFAULT_TOPO_NAME + ")")
+            .build());
+        options.addOption(Option.builder()
+            .longOpt("spouts")
+            .argName("NUM")
+            .hasArg()
+            .desc("Number of spouts to use (defaults to " + DEFAULT_NUM_SPOUTS + ")")
+            .build());
+        options.addOption(Option.builder()
+            .longOpt("splitters")
+            .argName("NUM")
+            .hasArg()
+            .desc("Number of splitter bolts to use (defaults to " + DEFAULT_NUM_SPLITS + ")")
+            .build());
+        options.addOption(Option.builder()
+            .longOpt("counters")
+            .argName("NUM")
+            .hasArg()
+            .desc("Number of counter bolts to use (defaults to " + DEFAULT_NUM_COUNTS + ")")
+            .build());
+        LoadMetricsServer.addCommandLineOptions(options);
+        CommandLineParser parser = new DefaultParser();
+        CommandLine cmd = null;
+        Exception commandLineException = null;
+        double numMins = TEST_EXECUTE_TIME_DEFAULT;
+        double ratePerSecond = DEFAULT_RATE_PER_SECOND;
+        String name = DEFAULT_TOPO_NAME;
+        int numSpouts = DEFAULT_NUM_SPOUTS;
+        int numSplits = DEFAULT_NUM_SPLITS;
+        int numCounts = DEFAULT_NUM_COUNTS;
+        try {
+            cmd = parser.parse(options, args);
+            if (cmd.hasOption("t")) {
+                numMins = Double.valueOf(cmd.getOptionValue("t"));
             }
-        }
-        if (id == null) {
-            throw new Exception("Could not find a topology named "+name);
-        }
-        TopologyInfo info = client.getTopologyInfo(id);
-        int uptime = info.get_uptime_secs();
-        long acked = 0;
-        long failed = 0;
-        for (ExecutorSummary exec: info.get_executors()) {
-            if ("spout".equals(exec.get_component_id()) && exec.get_stats() != null && exec.get_stats().get_specific() != null) {
-                SpoutStats stats = exec.get_stats().get_specific().get_spout();
-                Map<String, Long> failedMap = stats.get_failed().get(":all-time");
-                Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
-                if (ackedMap != null) {
-                    for (String key: ackedMap.keySet()) {
-                        if (failedMap != null) {
-                            Long tmp = failedMap.get(key);
-                            if (tmp != null) {
-                                failed += tmp;
-                            }
-                        }
-                        long ackVal = ackedMap.get(key);
-                        acked += ackVal;
-                    }
-                }
+            if (cmd.hasOption("rate")) {
+                ratePerSecond = Double.parseDouble(cmd.getOptionValue("rate"));
             }
+            if (cmd.hasOption("name")) {
+                name = cmd.getOptionValue("name");
+            }
+            if (cmd.hasOption("spouts")) {
+                numSpouts = Integer.parseInt(cmd.getOptionValue("spouts"));
+            }
+            if (cmd.hasOption("splitters")) {
+                numSplits = Integer.parseInt(cmd.getOptionValue("splitters"));
+            }
+            if (cmd.hasOption("counters")) {
+                numCounts = Integer.parseInt(cmd.getOptionValue("counters"));
+            }
+        } catch (ParseException | NumberFormatException e) {
+            commandLineException = e;
         }
-        long ackedThisTime = acked - _prev_acked;
-        long thisTime = uptime - _prev_uptime;
-        long nnpct, nnnpct, min, max;
-        double mean, stddev;
-        synchronized(_histo) {
-            nnpct = _histo.getValueAtPercentile(99.0);
-            nnnpct = _histo.getValueAtPercentile(99.9);
-            min = _histo.getMinValue();
-            max = _histo.getMaxValue();
-            mean = _histo.getMean();
-            stddev = _histo.getStdDeviation();
-            _histo.reset();
-        }
-        long user = _userCPU.getAndSet(0);
-        long sys = _systemCPU.getAndSet(0);
-        long gc = _gcMs.getAndSet(0);
-        double memMB = readMemory() / (1024.0 * 1024.0);
-        System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d " +
-                "99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f " +
-                "stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n",
-                uptime, ackedThisTime, (((double)ackedThisTime)/thisTime), failed, nnpct, nnnpct,
-                min, max, mean, stddev, user, sys, gc, memMB);
-        _prev_uptime = uptime;
-        _prev_acked = acked;
-    }
-
-    public static void kill(Nimbus.Iface client, String name) throws Exception {
-        KillOptions opts = new KillOptions();
-        opts.set_wait_secs(0);
-        client.killTopologyWithOpts(name, opts);
-    }
-
-    public static void main(String[] args) throws Exception {
-        long ratePerSecond = 500;
-        if (args != null && args.length > 0) {
-            ratePerSecond = Long.valueOf(args[0]);
-        }
-
-        int parallelism = 4;
-        if (args != null && args.length > 1) {
-            parallelism = Integer.valueOf(args[1]);
-        }
-
-        int numMins = 5;
-        if (args != null && args.length > 2) {
-            numMins = Integer.valueOf(args[2]);
-        }
-
-        String name = "wc-test";
-        if (args != null && args.length > 3) {
-            name = args[3];
+        if (commandLineException != null || cmd.hasOption('h')) {
+            if (commandLineException != null) {
+                System.err.println("ERROR " + commandLineException.getMessage());
+            }
+            new HelpFormatter().printHelp("ThroughputVsLatency [options]", options);
+            return;
         }
 
         Config conf = new Config();
-        HttpForwardingMetricsServer metricServer = new HttpForwardingMetricsServer(conf) {
-            @Override
-            public void handle(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-                String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
-                for (DataPoint dp: dataPoints) {
-                    if ("comp-lat-histo".equals(dp.name) && dp.value instanceof Histogram) {
-                        synchronized(_histo) {
-                            _histo.add((Histogram)dp.value);
-                        }
-                    } else if ("CPU".equals(dp.name) && dp.value instanceof Map) {
-                        Map<Object, Object> m = (Map<Object, Object>)dp.value;
-                        Object sys = m.get("sys-ms");
-                        if (sys instanceof Number) {
-                            _systemCPU.getAndAdd(((Number)sys).longValue());
-                        }
-                        Object user = m.get("user-ms");
-                        if (user instanceof Number) {
-                            _userCPU.getAndAdd(((Number)user).longValue());
-                        }
-                    } else if (dp.name.startsWith("GC/") && dp.value instanceof Map) {
-                        Map<Object, Object> m = (Map<Object, Object>)dp.value;
-                        Object count = m.get("count");
-                        if (count instanceof Number) {
-                            _gcCount.getAndAdd(((Number)count).longValue());
-                        }
-                        Object time = m.get("timeMs");
-                        if (time instanceof Number) {
-                            _gcMs.getAndAdd(((Number)time).longValue());
-                        }
-                    } else if (dp.name.startsWith("memory/") && dp.value instanceof Map) {
-                        Map<Object, Object> m = (Map<Object, Object>)dp.value;
-                        Object val = m.get("usedBytes");
-                        if (val instanceof Number) {
-                            MemMeasure mm = _memoryBytes.get(worker);
-                            if (mm == null) {
-                                mm = new MemMeasure();
-                                MemMeasure tmp = _memoryBytes.putIfAbsent(worker, mm);
-                                mm = tmp == null ? mm : tmp; 
-                            }
-                            mm.update(((Number)val).longValue());
-                        }
-                    }
-                }
-            }
-        };
-
+        LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd);
         metricServer.serve();
         String url = metricServer.getUrl();
 
         NimbusClient client = NimbusClient.getConfiguredClient(conf);
-        conf.setNumWorkers(parallelism);
-        conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class);
-        conf.registerMetricsConsumer(org.apache.storm.misc.metric.HttpForwardingMetricsConsumer.class, url, 1);
-        Map<String, String> workerMetrics = new HashMap<String, String>();
+        conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
+        conf.registerMetricsConsumer(HttpForwardingMetricsConsumer.class, url, 1);
+        Map<String, String> workerMetrics = new HashMap<>();
         if (!NimbusClient.isLocalOverride()) {
             //sigar uses JNI and does not work in local mode
             workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
@@ -351,27 +224,27 @@ public class ThroughputVsLatency {
         conf.put(Config.TOPOLOGY_WORKER_METRICS, workerMetrics);
         conf.put(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS, 10);
         conf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS,
-                "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
+            "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC "
+                + "-XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
         conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");
 
         TopologyBuilder builder = new TopologyBuilder();
 
-        int numEach = 4 * parallelism;
-        builder.setSpout("spout", new FastRandomSentenceSpout(ratePerSecond/numEach), numEach);
+        builder.setSpout("spout", new FastRandomSentenceSpout((long) ratePerSecond / numSpouts), numSpouts);
+        builder.setBolt("split", new SplitSentence(), numSplits).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), numCounts).fieldsGrouping("split", new Fields("word"));
 
-        builder.setBolt("split", new SplitSentence(), numEach).shuffleGrouping("spout");
-        builder.setBolt("count", new WordCount(), numEach).fieldsGrouping("split", new Fields("word"));
-
-        try {
+        int exitStatus = -1;
+        try (ScopedTopologySet topologyNames = new ScopedTopologySet(client.getClient())) {
             StormSubmitter.submitTopology(name, conf, builder.createTopology());
+            topologyNames.add(name);
 
-            for (int i = 0; i < numMins * 2; i++) {
-                Thread.sleep(30 * 1000);
-                printMetrics(client.getClient(), name);
-            }
+            metricServer.monitorFor(numMins, client.getClient(), topologyNames);
+            exitStatus = 0;
+        } catch (Exception e) {
+            LOG.error("Error while running test", e);
         } finally {
-            kill(client.getClient(), name);
-            System.exit(0);
+            System.exit(exitStatus);
         }
     }
 }