You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:46 UTC

[05/18] storm git commit: STORM-2702: fixed a bug and added in more metrics

STORM-2702: fixed a bug and added in more metrics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/211f8a94
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/211f8a94
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/211f8a94

Branch: refs/heads/master
Commit: 211f8a944c096b04083c211306687daeb16c7adc
Parents: 6c2dcbe
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Aug 23 12:49:41 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Aug 23 14:30:38 2017 -0500

----------------------------------------------------------------------
 examples/storm-loadgen/README.md                |  73 ++++++---
 .../java/org/apache/storm/loadgen/GenLoad.java  |   8 +-
 .../apache/storm/loadgen/LoadMetricsServer.java | 157 ++++++++++++++-----
 .../storm/loadgen/ThroughputVsLatency.java      |   9 +-
 4 files changed, 182 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/211f8a94/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index 8414768..e91409c 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -81,36 +81,57 @@ Not all options are supported by all reporters.
 |Reporter Option| Description | Supported Reporters|
 |---------------|-------------|--------------------|
 |time | Set the time unit that you want latency and CPU reported in.  This can be from nanoseconds up to seconds.  Most names are supported for the types| legacy, csv, tsv|
-|columns | A comma separated list of columns to output (see below for the metrics supported).  Defaults to "start_time", "end_time", "completion_rate", "mean", "99%ile", "99.9%ile", "cores", "mem", "failed" | csv, tsv |
-|extraColumns | Like columns but ones that should be added to the defaults instead of replacing them (this is mostly for convenience) | csv, tsv |
+|columns | A comma separated list of columns to output (see below for the metrics supported).  A `*` is replaced by all metrics. Defaults to "start_time", "end_time", "completion_rate", "mean", "99%ile", "99.9%ile", "cores", "mem", "failed" | csv, tsv |
+|extraColumns | Like columns but ones that should be added to the defaults instead of replacing them. A `*` is replaced by all metrics. | csv, tsv |
 |meta | An arbitrary string that will appear as a "meta" column at the end.  This helps when appending to files to keep different runs separated | csv, tsv|
 
 There are a lot of different metrics supported
 
-|Metrics Name| Description|
-|------------|------------|
-|99%ile| 99th percentile completion latency. |
-|99.9%ile| 99.9th percentile completion latency. |
-|median| Median completion latency. |
-|mean| Mean completion latency. |
-|min| Minimum completion latency. |
-|max| Maximum completion latency. |
-|stddev| Standard Deviation of completion latency. |
-|user_cpu| User space CPU time.|
-|sys_cpu| System space CPU time. |
-|gc_cpu| Amount of CPU time spent in GC as reported by the JVM. |
-|cores| The number of CPU cores used. `(user_cpu + sys_cpu) / time_window`|
-|uptime| The amount of time the oldest topology has been up for. |
-|acked| The number of tuples fully acked as reported by Storm's metrics. |
-|rate| The rate of tuples fully acked as reported by Storm's metrics. |
-|completed| The number of tuples fully acked as reported by the latency histogram metrics. |
-|completion_rate| The rate of tuples fully acked as reported by the latency histogram metrics. |
-|mem| The amount of memory used by the topology in MB, as reported by the JVM. |
-|failed| The number of failed tuples as reported by Storm's metrics. |
-|start_time| The starting time of the metrics window from when the first topology was launched.
-|end_time| The ending time of the metrics window from the the first topology was launched.
-|time_window| the length in seconds for the time window. |
-|ids| The topology ids that are being tracked |
+|Metrics Name| Description| In |
+|------------|------------|----|
+|99%ile| 99th percentile completion latency. | all
+|99.9%ile| 99.9th percentile completion latency. | all
+|median| Median completion latency. | all
+|mean| Mean completion latency. | all
+|min| Minimum completion latency. | all
+|max| Maximum completion latency. | all
+|stddev| Standard Deviation of completion latency. | all
+|user_cpu| User space CPU time.| all
+|sys_cpu| System space CPU time. | all
+|gc_cpu| Amount of CPU time spent in GC as reported by the JVM. | all
+|cores| The number of CPU cores used. `(user_cpu + sys_cpu) / time_window`| all
+|uptime| The amount of time the oldest topology has been up for. | all
+|acked| The number of tuples fully acked as reported by Storm's metrics. | all
+|acked_rate| The rate of tuples fully acked as reported by Storm's metrics. | all
+|completed| The number of tuples fully acked as reported by the latency histogram metrics. | all
+|completion_rate| The rate of tuples fully acked as reported by the latency histogram metrics. | all
+|mem| The amount of memory used by the topology in MB, as reported by the JVM. | all
+|failed| The number of failed tuples as reported by Storm's metrics. | all
+|start_time| The starting time of the metrics window from when the first topology was launched. | all
+|end_time| The ending time of the metrics window from the the first topology was launched. | all
+|time_window| the length in seconds for the time window. | all
+|ids| The topology ids that are being tracked | all
+|storm_version| The version of storm as reported by the client | all
+|java_version| The version of java as reported by the client | all
+|os_arch| The OS architecture as reported by the client | all
+|os_name| The name of the OS as reported by the client | all
+|os_version| The version of the OS as reported by the client | all
+|config_override| And command line overrides to storm config values | all
+|hosts| The number of hosts the monitored topologies are running on| all
+|executors| The number of running executors in the monitored topologies | all
+|workers| The number of workers the monitored topologies are running on | all
+|target_rate| The target rate in sentenses per second for the ThroughputVsLatency topology | ThroughputVsLatency
+|spout_parallel| The parallelism of the spout for the `ThroughputVsLatency` topology. | ThroughputVsLatency
+|split_parallel| The parallelism of the split bolt for the `ThroughputVsLatency` topology. | ThroughputVsLatency
+|count_parallel| The parallelism of the count bolt for the `ThroughputVsLatency` topology. | ThroughputVsLatency
+|parallel\_adjust| The adjustment to the parallelism in `GenLoad`. | GenLoad
+|throughput_adjust| The adjustment to the throughput in `GenLoad`. | GenLoad
+|local\_or\_shuffle| true if shuffles were replaced with local or shuffle in GenLoad. | GenLoad
+
+There are also some generic rules that you can use for some metrics.  Any metric that starts with `"conf:"` will be the config for that.  It does not include config overrides from the `GenLoad` file.
+
+In addition any metric that ends with `"%ile"` will be the latency at that percentile.
+
 
 # Captured Load File Format
 The file format used with `CaptureLoad` and `GenLoad` is based off of the flux file format, but with some extensions and omissions.

http://git-wip-us.apache.org/repos/asf/storm/blob/211f8a94/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
index 7998fdc..8821b2a 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
@@ -21,6 +21,7 @@ package org.apache.storm.loadgen;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -117,8 +118,13 @@ public class GenLoad {
             new HelpFormatter().printHelp("GenLoad [options] [captured_file]*", options);
             return;
         }
+        Map<String, Object> metrics = new LinkedHashMap<>();
+        metrics.put("parallel_adjust", parallel);
+        metrics.put("throughput_adjust", throughput);
+        metrics.put("local_or_shuffle", cmd.hasOption("local-or-shuffle"));
+
         Config conf = new Config();
-        LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd);
+        LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd, metrics);
 
         metricServer.serve();
         String url = metricServer.getUrl();

http://git-wip-us.apache.org/repos/asf/storm/blob/211f8a94/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
index c126ca5..fd12247 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -29,6 +29,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +51,8 @@ import org.apache.storm.generated.SpoutStats;
 import org.apache.storm.generated.TopologyInfo;
 import org.apache.storm.generated.TopologySummary;
 import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,6 +99,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         private long acked;
         private long failed;
         private Set<String> topologyIds;
+        private long workers;
+        private long executors;
+        private long hosts;
 
         /**
          * Constructor.
@@ -105,7 +111,8 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
          * @param gcMs GC CPU in ms.
          */
         public Measurements(long uptimeSecs, long acked, long timeWindow, long failed, Histogram histo,
-                            double userMs, double sysMs, double gcMs, long memBytes, Set<String> topologyIds) {
+                            double userMs, double sysMs, double gcMs, long memBytes, Set<String> topologyIds,
+                            long workers, long executors, long hosts) {
             this.uptimeSecs = uptimeSecs;
             this.acked = acked;
             this.timeWindow = timeWindow;
@@ -116,6 +123,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             this.histo = histo;
             this.memBytes = memBytes;
             this.topologyIds = topologyIds;
+            this.workers = workers;
+            this.executors = executors;
+            this.hosts = hosts;
         }
 
         /**
@@ -132,6 +142,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             acked = 0;
             failed = 0;
             topologyIds = new HashSet<>();
+            workers = 0;
+            executors = 0;
+            hosts = 0;
         }
 
         /**
@@ -149,6 +162,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             uptimeSecs = Math.max(uptimeSecs, other.uptimeSecs);
             timeWindow += other.timeWindow;
             topologyIds.addAll(other.topologyIds);
+            workers = Math.max(workers, other.workers);
+            executors = Math.max(executors, other.executors);
+            hosts = Math.max(hosts, other.hosts);
         }
 
         public double getLatencyAtPercentile(double percential, TimeUnit unit) {
@@ -227,6 +243,18 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             return topologyIds;
         }
 
+        public long getWorkers() {
+            return workers;
+        }
+
+        public long getHosts() {
+            return hosts;
+        }
+
+        public long getExecutors() {
+            return executors;
+        }
+
         static Measurements combine(List<Measurements> measurements, Integer start, Integer count) {
             if (count == null) {
                 count = measurements.size();
@@ -257,12 +285,14 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
     abstract static class FileReporter implements MetricResultsReporter {
         protected final PrintStream out;
         private final boolean needsClose;
+        protected final Map<String, MetricExtractor> allExtractors;
 
-        public FileReporter() throws FileNotFoundException {
-            this(null, null);
+        public FileReporter(Map<String, MetricExtractor> allExtractors) throws FileNotFoundException {
+            this(null, Collections.emptyMap(), allExtractors);
         }
 
-        public FileReporter(String path, Map<String, String> query) throws FileNotFoundException {
+        public FileReporter(String path, Map<String, String> query,  Map<String, MetricExtractor> allExtractors)
+            throws FileNotFoundException {
             boolean append = Boolean.parseBoolean(query.getOrDefault("append", "false"));
 
             if (path == null || "/dev/stdout".equals(path)) {
@@ -275,6 +305,8 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
                 out = new PrintStream(new FileOutputStream(path, append));
                 needsClose = true;
             }
+            //Copy it in case we want to modify it
+            this.allExtractors = new LinkedHashMap<>(allExtractors);
         }
 
         @Override
@@ -337,31 +369,42 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
 
     static {
         //Perhaps there is a better way to do this???
-        HashMap<String, MetricExtractor> tmp = new HashMap<>();
+        LinkedHashMap<String, MetricExtractor> tmp = new LinkedHashMap<>();
+        tmp.put("start_time",  new MetricExtractor((m, unit) -> m.startTime(),"s"));
+        tmp.put("end_time",  new MetricExtractor((m, unit) -> m.endTime(), "s"));
+        tmp.put("completion_rate",  new MetricExtractor((m, unit) -> m.getCompletedPerSec(), "tuple/s"));
+        tmp.put("mean", new MetricExtractor((m, unit) -> m.getMeanLatency(unit)));
         tmp.put("99%ile", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(99.0, unit)));
         tmp.put("99.9%ile", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(99.9, unit)));
+        tmp.put("cores", new MetricExtractor(
+            (m, unit) -> (m.getSysTime(TimeUnit.SECONDS) + m.getUserTime(TimeUnit.SECONDS)) / m.getTimeWindow(),
+            ""));
+        tmp.put("mem",  new MetricExtractor((m, unit) -> m.getMemMb(), "MB"));
+        tmp.put("failed",  new MetricExtractor((m, unit) -> m.getFailed(), ""));
         tmp.put("median", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(50, unit)));
-        tmp.put("mean", new MetricExtractor((m, unit) -> m.getMeanLatency(unit)));
         tmp.put("min", new MetricExtractor((m, unit) -> m.getMinLatency(unit)));
         tmp.put("max", new MetricExtractor((m, unit) -> m.getMaxLatency(unit)));
         tmp.put("stddev", new MetricExtractor((m, unit) -> m.getLatencyStdDeviation(unit)));
         tmp.put("user_cpu", new MetricExtractor((m, unit) -> m.getUserTime(unit)));
         tmp.put("sys_cpu", new MetricExtractor((m, unit) -> m.getSysTime(unit)));
         tmp.put("gc_cpu", new MetricExtractor((m, unit) -> m.getGc(unit)));
-        tmp.put("cores", new MetricExtractor(
-            (m, unit) -> (m.getSysTime(TimeUnit.SECONDS) + m.getUserTime(TimeUnit.SECONDS)) / m.getTimeWindow(),
-            ""));
-        tmp.put("uptime",  new MetricExtractor((m, unit) -> m.getUptimeSecs(), "s"));
         tmp.put("acked",  new MetricExtractor((m, unit) -> m.getAcked(), ""));
-        tmp.put("rate",  new MetricExtractor((m, unit) -> m.getAckedPerSec(), "tuple/s"));
+        tmp.put("acked_rate",  new MetricExtractor((m, unit) -> m.getAckedPerSec(), "tuple/s"));
         tmp.put("completed",  new MetricExtractor((m, unit) -> m.getCompleted(), ""));
-        tmp.put("completion_rate",  new MetricExtractor((m, unit) -> m.getCompletedPerSec(), "tuple/s"));
-        tmp.put("mem",  new MetricExtractor((m, unit) -> m.getMemMb(), "MB"));
-        tmp.put("failed",  new MetricExtractor((m, unit) -> m.getFailed(), ""));
-        tmp.put("start_time",  new MetricExtractor((m, unit) -> m.startTime(),"s"));
-        tmp.put("end_time",  new MetricExtractor((m, unit) -> m.endTime(), "s"));
+        tmp.put("uptime",  new MetricExtractor((m, unit) -> m.getUptimeSecs(), "s"));
         tmp.put("time_window",  new MetricExtractor((m, unit) -> m.getTimeWindow(), "s"));
         tmp.put("ids",  new MetricExtractor((m, unit) -> m.getTopologyIds(), ""));
+        tmp.put("workers",  new MetricExtractor((m, unit) -> m.getWorkers(), ""));
+        tmp.put("hosts",  new MetricExtractor((m, unit) -> m.getHosts(), ""));
+        tmp.put("executors",  new MetricExtractor((m, unit) -> m.getExecutors(), ""));
+        String buildVersion = VersionInfo.getBuildVersion();
+        tmp.put("storm_version", new MetricExtractor((m, unit) -> buildVersion, ""));
+        tmp.put("java_version", new MetricExtractor((m, unit) -> System.getProperty("java.vendor")
+            + " " + System.getProperty("java.version"),""));
+        tmp.put("os_arch", new MetricExtractor((m, unit) -> System.getProperty("os.arch"), ""));
+        tmp.put("os_name", new MetricExtractor((m, unit) -> System.getProperty("os.name"), ""));
+        tmp.put("os_version", new MetricExtractor((m, unit) -> System.getProperty("os.version"), ""));
+        tmp.put("config_override", new MetricExtractor((m, unit) -> Utils.readCommandLineOpts(), ""));
         NAMED_EXTRACTORS = Collections.unmodifiableMap(tmp);
     }
 
@@ -405,20 +448,23 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         private final List<String> extractors;
         private final String meta;
 
-        public SepValReporter(String separator, String path, Map<String, String> query) throws FileNotFoundException {
-            super(path, query);
+        public SepValReporter(String separator, String path, Map<String, String> query, Map<String, MetricExtractor> extractorsMap)
+            throws FileNotFoundException {
+            super(path, query, extractorsMap);
             this.separator = separator;
             targetUnit = UNIT_MAP.get(query.getOrDefault("time", "MILLISECONDS").toUpperCase());
             if (targetUnit == null) {
                 throw new IllegalArgumentException(query.get("time") + " is not a supported time unit");
             }
             if (query.containsKey("columns")) {
-                extractors = Arrays.asList(query.get("columns").split("\\s*,\\s*"));
+                List<String> extractors = handleExtractorCleanup(Arrays.asList(query.get("columns").split("\\s*,\\s*")));
+
                 HashSet<String> notFound = new HashSet<>(extractors);
-                notFound.removeAll(NAMED_EXTRACTORS.keySet());
+                notFound.removeAll(allExtractors.keySet());
                 if (notFound.size() > 0) {
                     throw new IllegalArgumentException(notFound + " columns are not supported");
                 }
+                this.extractors = extractors;
             } else {
                 //Wrapping it makes it mutable
                 extractors = new ArrayList<>(Arrays.asList("start_time", "end_time", "completion_rate",
@@ -426,9 +472,10 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             }
 
             if (query.containsKey("extraColumns")) {
-                List<String> moreExtractors = Arrays.asList(query.get("extraColumns").split("\\s*,\\s*"));
+                List<String> moreExtractors =
+                    handleExtractorCleanup(Arrays.asList(query.get("extraColumns").split("\\s*,\\s*")));
                 for (String extractor: moreExtractors) {
-                    if (!NAMED_EXTRACTORS.containsKey(extractor)) {
+                    if (!allExtractors.containsKey(extractor)) {
                         throw new IllegalArgumentException(extractor + " is not a supported column");
                     }
                     if (!extractors.contains(extractor)) {
@@ -440,6 +487,28 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             meta = query.get("meta");
         }
 
+        private List<String> handleExtractorCleanup(List<String> orig) {
+            Map<String, Object> stormConfig = Utils.readStormConfig();
+            List<String> ret = new ArrayList<>(orig.size());
+            for (String extractor: orig) {
+                if (extractor.startsWith("conf:")) {
+                    String confKey = extractor.substring("conf:".length());
+                    Object confValue = stormConfig.get(confKey);
+                    allExtractors.put(extractor, new MetricExtractor((m, t) -> confValue, ""));
+                    ret.add(extractor);
+                } else if (extractor.endsWith("%ile")) {
+                    double number = Double.valueOf(extractor.substring(0, extractor.length() - "%ile".length()));
+                    allExtractors.put(extractor, new MetricExtractor((m, t) -> m.getLatencyAtPercentile(number, t)));
+                    ret.add(extractor);
+                } else if ("*".equals(extractor)) {
+                    ret.addAll(allExtractors.keySet());
+                } else {
+                    ret.add(extractor);
+                }
+            }
+            return ret;
+        }
+
         @Override
         public void start() {
             boolean first = true;
@@ -448,7 +517,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
                     out.print(separator);
                 }
                 first = false;
-                out.print(NAMED_EXTRACTORS.get(name).formatName(name, targetUnit));
+                out.print(allExtractors.get(name).formatName(name, targetUnit));
             }
             if (meta != null) {
                 out.print(separator);
@@ -465,7 +534,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
                     out.print(separator);
                 }
                 first = false;
-                Object value = NAMED_EXTRACTORS.get(name).get(m, targetUnit);
+                Object value = allExtractors.get(name).get(m, targetUnit);
                 String svalue = value == null ? "" : value.toString();
                 out.print(escape(svalue));
             }
@@ -484,13 +553,14 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
     static class LegacyReporter extends FileReporter {
         private final TimeUnit targetUnitOverride;
 
-        public LegacyReporter() throws FileNotFoundException {
-            super();
+        public LegacyReporter(Map<String, MetricExtractor> allExtractors) throws FileNotFoundException {
+            super(allExtractors);
             targetUnitOverride = null;
         }
 
-        public LegacyReporter(String path, Map<String, String> query) throws FileNotFoundException {
-            super(path, query);
+        public LegacyReporter(String path, Map<String, String> query, Map<String, MetricExtractor> allExtractors)
+            throws FileNotFoundException {
+            super(path, query, allExtractors);
             if (query.containsKey("time")) {
                 targetUnitOverride = UNIT_MAP.get(query.get("time").toUpperCase());
                 if (targetUnitOverride == null) {
@@ -540,7 +610,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         options.addOption(Option.builder("r")
             .longOpt("report-interval")
             .hasArg()
-            .argName("INTERVAL_SECS")
+            .argName("SECS")
             .desc("How long in between reported metrics.  Will be rounded up to the next 10 sec boundary.\n"
                 + "default " + DEFAULT_REPORT_INTERVAL)
             .build());
@@ -548,7 +618,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         options.addOption(Option.builder("w")
             .longOpt("report-window")
             .hasArg()
-            .argName("INTERVAL_SECS")
+            .argName("SECS")
             .desc("How long of a rolling window should be in each report.  Will be rounded up to the next report interval boundary.\n"
                 + "default " + DEFAULT_WINDOW_INTERVAL)
             .build());
@@ -585,8 +655,14 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
 
     private final LinkedList<Measurements> allCombined = new LinkedList<>();
 
-    LoadMetricsServer(Map<String, Object> conf, CommandLine commandLine) throws URISyntaxException, FileNotFoundException {
+    LoadMetricsServer(Map<String, Object> conf, CommandLine commandLine, Map<String, Object> parameterMetrics) throws URISyntaxException,
+        FileNotFoundException {
         super(conf);
+        Map<String, MetricExtractor> allExtractors = new LinkedHashMap<>(NAMED_EXTRACTORS);
+        for (Map.Entry<String, Object> entry: parameterMetrics.entrySet()) {
+            final Object value = entry.getValue();
+            allExtractors.put(entry.getKey(), new MetricExtractor((m, unit) -> value, ""));
+        }
         if (commandLine.hasOption("r")) {
             reportIntervalSecs = Long.parseLong(commandLine.getOptionValue("r"));
             reportIntervalSecs = ((reportIntervalSecs + 1) / 10) * 10;
@@ -600,7 +676,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             for (String reporterString: commandLine.getOptionValues("reporter")) {
                 Matcher m = REPORTER_PATTERN.matcher(reporterString);
                 if (!m.matches()) {
-                    throw new IllegalArgumentException(reporterString + " does nto look like it is a reporter");
+                    throw new IllegalArgumentException(reporterString + " does not look like it is a reporter");
                 }
                 String type = m.group("type");
                 String path = m.group("path");
@@ -617,20 +693,20 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
                 type = type.toUpperCase();
                 switch (type) {
                     case "LEGACY":
-                        reporters.add(new LegacyReporter(path, query));
+                        reporters.add(new LegacyReporter(path, query, allExtractors));
                         break;
                     case "TSV":
-                        reporters.add(new SepValReporter("\t", path, query));
+                        reporters.add(new SepValReporter("\t", path, query, allExtractors));
                         break;
                     case "CSV":
-                        reporters.add(new SepValReporter(",", path, query));
+                        reporters.add(new SepValReporter(",", path, query, allExtractors));
                         break;
                     default:
                         throw new RuntimeException(type + " is not a supported reporter type");
                 }
             }
         } else {
-            reporters.add(new LegacyReporter());
+            reporters.add(new LegacyReporter(allExtractors));
         }
     }
 
@@ -682,6 +758,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         if (ids.size() != names.size()) {
             throw new Exception("Could not find all topologies: " + names);
         }
+        HashSet<String> workers = new HashSet<>();
+        HashSet<String> hosts = new HashSet<>();
+        int executors = 0;
         int uptime = 0;
         long acked = 0;
         long failed = 0;
@@ -689,6 +768,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             TopologyInfo info = client.getTopologyInfo(id);
             uptime = Math.max(uptime, info.get_uptime_secs());
             for (ExecutorSummary exec : info.get_executors()) {
+                hosts.add(exec.get_host());
+                workers.add(exec.get_host() + exec.get_port());
+                executors++;
                 if (exec.get_stats() != null && exec.get_stats().get_specific() != null
                     && exec.get_stats().get_specific().is_set_spout()) {
                     SpoutStats stats = exec.get_stats().get_specific().get_spout();
@@ -729,7 +811,8 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         long gc = gcMs.getAndSet(0);
         long memBytes = readMemory();
 
-        allCombined.add(new Measurements(uptime, ackedThisTime, thisTime, failedThisTime, copy, user, sys, gc, memBytes, ids));
+        allCombined.add(new Measurements(uptime, ackedThisTime, thisTime, failedThisTime, copy, user, sys, gc, memBytes,
+            ids, workers.size(), executors, hosts.size()));
         Measurements inWindow = Measurements.combine(allCombined, null, windowLength);
         for (MetricResultsReporter reporter: reporters) {
             reporter.reportWindow(inWindow, allCombined);

http://git-wip-us.apache.org/repos/asf/storm/blob/211f8a94/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
index 2c22b42..3d3a271 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
@@ -19,6 +19,7 @@
 package org.apache.storm.loadgen;
 
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.commons.cli.CommandLine;
@@ -208,8 +209,14 @@ public class ThroughputVsLatency {
             return;
         }
 
+        Map<String, Object> metrics = new LinkedHashMap<>();
+        metrics.put("target_rate", ratePerSecond);
+        metrics.put("spout_parallel", numSpouts);
+        metrics.put("split_parallel", numSplits);
+        metrics.put("count_parallel", numCounts);
+
         Config conf = new Config();
-        LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd);
+        LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd, metrics);
         metricServer.serve();
         String url = metricServer.getUrl();