You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:46 UTC
[05/18] storm git commit: STORM-2702: fixed a bug and added in more
metrics
STORM-2702: fixed a bug and added in more metrics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/211f8a94
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/211f8a94
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/211f8a94
Branch: refs/heads/master
Commit: 211f8a944c096b04083c211306687daeb16c7adc
Parents: 6c2dcbe
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Aug 23 12:49:41 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Aug 23 14:30:38 2017 -0500
----------------------------------------------------------------------
examples/storm-loadgen/README.md | 73 ++++++---
.../java/org/apache/storm/loadgen/GenLoad.java | 8 +-
.../apache/storm/loadgen/LoadMetricsServer.java | 157 ++++++++++++++-----
.../storm/loadgen/ThroughputVsLatency.java | 9 +-
4 files changed, 182 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/211f8a94/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index 8414768..e91409c 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -81,36 +81,57 @@ Not all options are supported by all reporters.
|Reporter Option| Description | Supported Reporters|
|---------------|-------------|--------------------|
|time | Set the time unit that you want latency and CPU reported in. This can be from nanoseconds up to seconds. Most names are supported for the types| legacy, csv, tsv|
-|columns | A comma separated list of columns to output (see below for the metrics supported). Defaults to "start_time", "end_time", "completion_rate", "mean", "99%ile", "99.9%ile", "cores", "mem", "failed" | csv, tsv |
-|extraColumns | Like columns but ones that should be added to the defaults instead of replacing them (this is mostly for convenience) | csv, tsv |
+|columns | A comma separated list of columns to output (see below for the metrics supported). A `*` is replaced by all metrics. Defaults to "start_time", "end_time", "completion_rate", "mean", "99%ile", "99.9%ile", "cores", "mem", "failed" | csv, tsv |
+|extraColumns | Like columns but ones that should be added to the defaults instead of replacing them. A `*` is replaced by all metrics. | csv, tsv |
|meta | An arbitrary string that will appear as a "meta" column at the end. This helps when appending to files to keep different runs separated | csv, tsv|
There are a lot of different metrics supported
-|Metrics Name| Description|
-|------------|------------|
-|99%ile| 99th percentile completion latency. |
-|99.9%ile| 99.9th percentile completion latency. |
-|median| Median completion latency. |
-|mean| Mean completion latency. |
-|min| Minimum completion latency. |
-|max| Maximum completion latency. |
-|stddev| Standard Deviation of completion latency. |
-|user_cpu| User space CPU time.|
-|sys_cpu| System space CPU time. |
-|gc_cpu| Amount of CPU time spent in GC as reported by the JVM. |
-|cores| The number of CPU cores used. `(user_cpu + sys_cpu) / time_window`|
-|uptime| The amount of time the oldest topology has been up for. |
-|acked| The number of tuples fully acked as reported by Storm's metrics. |
-|rate| The rate of tuples fully acked as reported by Storm's metrics. |
-|completed| The number of tuples fully acked as reported by the latency histogram metrics. |
-|completion_rate| The rate of tuples fully acked as reported by the latency histogram metrics. |
-|mem| The amount of memory used by the topology in MB, as reported by the JVM. |
-|failed| The number of failed tuples as reported by Storm's metrics. |
-|start_time| The starting time of the metrics window from when the first topology was launched.
-|end_time| The ending time of the metrics window from the the first topology was launched.
-|time_window| the length in seconds for the time window. |
-|ids| The topology ids that are being tracked |
+|Metrics Name| Description| In |
+|------------|------------|----|
+|99%ile| 99th percentile completion latency. | all
+|99.9%ile| 99.9th percentile completion latency. | all
+|median| Median completion latency. | all
+|mean| Mean completion latency. | all
+|min| Minimum completion latency. | all
+|max| Maximum completion latency. | all
+|stddev| Standard Deviation of completion latency. | all
+|user_cpu| User space CPU time.| all
+|sys_cpu| System space CPU time. | all
+|gc_cpu| Amount of CPU time spent in GC as reported by the JVM. | all
+|cores| The number of CPU cores used. `(user_cpu + sys_cpu) / time_window`| all
+|uptime| The amount of time the oldest topology has been up for. | all
+|acked| The number of tuples fully acked as reported by Storm's metrics. | all
+|acked_rate| The rate of tuples fully acked as reported by Storm's metrics. | all
+|completed| The number of tuples fully acked as reported by the latency histogram metrics. | all
+|completion_rate| The rate of tuples fully acked as reported by the latency histogram metrics. | all
+|mem| The amount of memory used by the topology in MB, as reported by the JVM. | all
+|failed| The number of failed tuples as reported by Storm's metrics. | all
+|start_time| The starting time of the metrics window from when the first topology was launched. | all
+|end_time| The ending time of the metrics window from the the first topology was launched. | all
+|time_window| the length in seconds for the time window. | all
+|ids| The topology ids that are being tracked | all
+|storm_version| The version of storm as reported by the client | all
+|java_version| The version of java as reported by the client | all
+|os_arch| The OS architecture as reported by the client | all
+|os_name| The name of the OS as reported by the client | all
+|os_version| The version of the OS as reported by the client | all
+|config_override| And command line overrides to storm config values | all
+|hosts| The number of hosts the monitored topologies are running on| all
+|executors| The number of running executors in the monitored topologies | all
+|workers| The number of workers the monitored topologies are running on | all
+|target_rate| The target rate in sentenses per second for the ThroughputVsLatency topology | ThroughputVsLatency
+|spout_parallel| The parallelism of the spout for the `ThroughputVsLatency` topology. | ThroughputVsLatency
+|split_parallel| The parallelism of the split bolt for the `ThroughputVsLatency` topology. | ThroughputVsLatency
+|count_parallel| The parallelism of the count bolt for the `ThroughputVsLatency` topology. | ThroughputVsLatency
+|parallel\_adjust| The adjustment to the parallelism in `GenLoad`. | GenLoad
+|throughput_adjust| The adjustment to the throughput in `GenLoad`. | GenLoad
+|local\_or\_shuffle| true if shuffles were replaced with local or shuffle in GenLoad. | GenLoad
+
+There are also some generic rules that you can use for some metrics. Any metric that starts with `"conf:"` will be the config for that. It does not include config overrides from the `GenLoad` file.
+
+In addition any metric that ends with `"%ile"` will be the latency at that percentile.
+
# Captured Load File Format
The file format used with `CaptureLoad` and `GenLoad` is based off of the flux file format, but with some extensions and omissions.
http://git-wip-us.apache.org/repos/asf/storm/blob/211f8a94/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
index 7998fdc..8821b2a 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
@@ -21,6 +21,7 @@ package org.apache.storm.loadgen;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -117,8 +118,13 @@ public class GenLoad {
new HelpFormatter().printHelp("GenLoad [options] [captured_file]*", options);
return;
}
+ Map<String, Object> metrics = new LinkedHashMap<>();
+ metrics.put("parallel_adjust", parallel);
+ metrics.put("throughput_adjust", throughput);
+ metrics.put("local_or_shuffle", cmd.hasOption("local-or-shuffle"));
+
Config conf = new Config();
- LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd);
+ LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd, metrics);
metricServer.serve();
String url = metricServer.getUrl();
http://git-wip-us.apache.org/repos/asf/storm/blob/211f8a94/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
index c126ca5..fd12247 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -29,6 +29,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -50,6 +51,8 @@ import org.apache.storm.generated.SpoutStats;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +99,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
private long acked;
private long failed;
private Set<String> topologyIds;
+ private long workers;
+ private long executors;
+ private long hosts;
/**
* Constructor.
@@ -105,7 +111,8 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
* @param gcMs GC CPU in ms.
*/
public Measurements(long uptimeSecs, long acked, long timeWindow, long failed, Histogram histo,
- double userMs, double sysMs, double gcMs, long memBytes, Set<String> topologyIds) {
+ double userMs, double sysMs, double gcMs, long memBytes, Set<String> topologyIds,
+ long workers, long executors, long hosts) {
this.uptimeSecs = uptimeSecs;
this.acked = acked;
this.timeWindow = timeWindow;
@@ -116,6 +123,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
this.histo = histo;
this.memBytes = memBytes;
this.topologyIds = topologyIds;
+ this.workers = workers;
+ this.executors = executors;
+ this.hosts = hosts;
}
/**
@@ -132,6 +142,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
acked = 0;
failed = 0;
topologyIds = new HashSet<>();
+ workers = 0;
+ executors = 0;
+ hosts = 0;
}
/**
@@ -149,6 +162,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
uptimeSecs = Math.max(uptimeSecs, other.uptimeSecs);
timeWindow += other.timeWindow;
topologyIds.addAll(other.topologyIds);
+ workers = Math.max(workers, other.workers);
+ executors = Math.max(executors, other.executors);
+ hosts = Math.max(hosts, other.hosts);
}
public double getLatencyAtPercentile(double percential, TimeUnit unit) {
@@ -227,6 +243,18 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
return topologyIds;
}
+ public long getWorkers() {
+ return workers;
+ }
+
+ public long getHosts() {
+ return hosts;
+ }
+
+ public long getExecutors() {
+ return executors;
+ }
+
static Measurements combine(List<Measurements> measurements, Integer start, Integer count) {
if (count == null) {
count = measurements.size();
@@ -257,12 +285,14 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
abstract static class FileReporter implements MetricResultsReporter {
protected final PrintStream out;
private final boolean needsClose;
+ protected final Map<String, MetricExtractor> allExtractors;
- public FileReporter() throws FileNotFoundException {
- this(null, null);
+ public FileReporter(Map<String, MetricExtractor> allExtractors) throws FileNotFoundException {
+ this(null, Collections.emptyMap(), allExtractors);
}
- public FileReporter(String path, Map<String, String> query) throws FileNotFoundException {
+ public FileReporter(String path, Map<String, String> query, Map<String, MetricExtractor> allExtractors)
+ throws FileNotFoundException {
boolean append = Boolean.parseBoolean(query.getOrDefault("append", "false"));
if (path == null || "/dev/stdout".equals(path)) {
@@ -275,6 +305,8 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
out = new PrintStream(new FileOutputStream(path, append));
needsClose = true;
}
+ //Copy it in case we want to modify it
+ this.allExtractors = new LinkedHashMap<>(allExtractors);
}
@Override
@@ -337,31 +369,42 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
static {
//Perhaps there is a better way to do this???
- HashMap<String, MetricExtractor> tmp = new HashMap<>();
+ LinkedHashMap<String, MetricExtractor> tmp = new LinkedHashMap<>();
+ tmp.put("start_time", new MetricExtractor((m, unit) -> m.startTime(),"s"));
+ tmp.put("end_time", new MetricExtractor((m, unit) -> m.endTime(), "s"));
+ tmp.put("completion_rate", new MetricExtractor((m, unit) -> m.getCompletedPerSec(), "tuple/s"));
+ tmp.put("mean", new MetricExtractor((m, unit) -> m.getMeanLatency(unit)));
tmp.put("99%ile", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(99.0, unit)));
tmp.put("99.9%ile", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(99.9, unit)));
+ tmp.put("cores", new MetricExtractor(
+ (m, unit) -> (m.getSysTime(TimeUnit.SECONDS) + m.getUserTime(TimeUnit.SECONDS)) / m.getTimeWindow(),
+ ""));
+ tmp.put("mem", new MetricExtractor((m, unit) -> m.getMemMb(), "MB"));
+ tmp.put("failed", new MetricExtractor((m, unit) -> m.getFailed(), ""));
tmp.put("median", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(50, unit)));
- tmp.put("mean", new MetricExtractor((m, unit) -> m.getMeanLatency(unit)));
tmp.put("min", new MetricExtractor((m, unit) -> m.getMinLatency(unit)));
tmp.put("max", new MetricExtractor((m, unit) -> m.getMaxLatency(unit)));
tmp.put("stddev", new MetricExtractor((m, unit) -> m.getLatencyStdDeviation(unit)));
tmp.put("user_cpu", new MetricExtractor((m, unit) -> m.getUserTime(unit)));
tmp.put("sys_cpu", new MetricExtractor((m, unit) -> m.getSysTime(unit)));
tmp.put("gc_cpu", new MetricExtractor((m, unit) -> m.getGc(unit)));
- tmp.put("cores", new MetricExtractor(
- (m, unit) -> (m.getSysTime(TimeUnit.SECONDS) + m.getUserTime(TimeUnit.SECONDS)) / m.getTimeWindow(),
- ""));
- tmp.put("uptime", new MetricExtractor((m, unit) -> m.getUptimeSecs(), "s"));
tmp.put("acked", new MetricExtractor((m, unit) -> m.getAcked(), ""));
- tmp.put("rate", new MetricExtractor((m, unit) -> m.getAckedPerSec(), "tuple/s"));
+ tmp.put("acked_rate", new MetricExtractor((m, unit) -> m.getAckedPerSec(), "tuple/s"));
tmp.put("completed", new MetricExtractor((m, unit) -> m.getCompleted(), ""));
- tmp.put("completion_rate", new MetricExtractor((m, unit) -> m.getCompletedPerSec(), "tuple/s"));
- tmp.put("mem", new MetricExtractor((m, unit) -> m.getMemMb(), "MB"));
- tmp.put("failed", new MetricExtractor((m, unit) -> m.getFailed(), ""));
- tmp.put("start_time", new MetricExtractor((m, unit) -> m.startTime(),"s"));
- tmp.put("end_time", new MetricExtractor((m, unit) -> m.endTime(), "s"));
+ tmp.put("uptime", new MetricExtractor((m, unit) -> m.getUptimeSecs(), "s"));
tmp.put("time_window", new MetricExtractor((m, unit) -> m.getTimeWindow(), "s"));
tmp.put("ids", new MetricExtractor((m, unit) -> m.getTopologyIds(), ""));
+ tmp.put("workers", new MetricExtractor((m, unit) -> m.getWorkers(), ""));
+ tmp.put("hosts", new MetricExtractor((m, unit) -> m.getHosts(), ""));
+ tmp.put("executors", new MetricExtractor((m, unit) -> m.getExecutors(), ""));
+ String buildVersion = VersionInfo.getBuildVersion();
+ tmp.put("storm_version", new MetricExtractor((m, unit) -> buildVersion, ""));
+ tmp.put("java_version", new MetricExtractor((m, unit) -> System.getProperty("java.vendor")
+ + " " + System.getProperty("java.version"),""));
+ tmp.put("os_arch", new MetricExtractor((m, unit) -> System.getProperty("os.arch"), ""));
+ tmp.put("os_name", new MetricExtractor((m, unit) -> System.getProperty("os.name"), ""));
+ tmp.put("os_version", new MetricExtractor((m, unit) -> System.getProperty("os.version"), ""));
+ tmp.put("config_override", new MetricExtractor((m, unit) -> Utils.readCommandLineOpts(), ""));
NAMED_EXTRACTORS = Collections.unmodifiableMap(tmp);
}
@@ -405,20 +448,23 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
private final List<String> extractors;
private final String meta;
- public SepValReporter(String separator, String path, Map<String, String> query) throws FileNotFoundException {
- super(path, query);
+ public SepValReporter(String separator, String path, Map<String, String> query, Map<String, MetricExtractor> extractorsMap)
+ throws FileNotFoundException {
+ super(path, query, extractorsMap);
this.separator = separator;
targetUnit = UNIT_MAP.get(query.getOrDefault("time", "MILLISECONDS").toUpperCase());
if (targetUnit == null) {
throw new IllegalArgumentException(query.get("time") + " is not a supported time unit");
}
if (query.containsKey("columns")) {
- extractors = Arrays.asList(query.get("columns").split("\\s*,\\s*"));
+ List<String> extractors = handleExtractorCleanup(Arrays.asList(query.get("columns").split("\\s*,\\s*")));
+
HashSet<String> notFound = new HashSet<>(extractors);
- notFound.removeAll(NAMED_EXTRACTORS.keySet());
+ notFound.removeAll(allExtractors.keySet());
if (notFound.size() > 0) {
throw new IllegalArgumentException(notFound + " columns are not supported");
}
+ this.extractors = extractors;
} else {
//Wrapping it makes it mutable
extractors = new ArrayList<>(Arrays.asList("start_time", "end_time", "completion_rate",
@@ -426,9 +472,10 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
}
if (query.containsKey("extraColumns")) {
- List<String> moreExtractors = Arrays.asList(query.get("extraColumns").split("\\s*,\\s*"));
+ List<String> moreExtractors =
+ handleExtractorCleanup(Arrays.asList(query.get("extraColumns").split("\\s*,\\s*")));
for (String extractor: moreExtractors) {
- if (!NAMED_EXTRACTORS.containsKey(extractor)) {
+ if (!allExtractors.containsKey(extractor)) {
throw new IllegalArgumentException(extractor + " is not a supported column");
}
if (!extractors.contains(extractor)) {
@@ -440,6 +487,28 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
meta = query.get("meta");
}
+ private List<String> handleExtractorCleanup(List<String> orig) {
+ Map<String, Object> stormConfig = Utils.readStormConfig();
+ List<String> ret = new ArrayList<>(orig.size());
+ for (String extractor: orig) {
+ if (extractor.startsWith("conf:")) {
+ String confKey = extractor.substring("conf:".length());
+ Object confValue = stormConfig.get(confKey);
+ allExtractors.put(extractor, new MetricExtractor((m, t) -> confValue, ""));
+ ret.add(extractor);
+ } else if (extractor.endsWith("%ile")) {
+ double number = Double.valueOf(extractor.substring(0, extractor.length() - "%ile".length()));
+ allExtractors.put(extractor, new MetricExtractor((m, t) -> m.getLatencyAtPercentile(number, t)));
+ ret.add(extractor);
+ } else if ("*".equals(extractor)) {
+ ret.addAll(allExtractors.keySet());
+ } else {
+ ret.add(extractor);
+ }
+ }
+ return ret;
+ }
+
@Override
public void start() {
boolean first = true;
@@ -448,7 +517,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
out.print(separator);
}
first = false;
- out.print(NAMED_EXTRACTORS.get(name).formatName(name, targetUnit));
+ out.print(allExtractors.get(name).formatName(name, targetUnit));
}
if (meta != null) {
out.print(separator);
@@ -465,7 +534,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
out.print(separator);
}
first = false;
- Object value = NAMED_EXTRACTORS.get(name).get(m, targetUnit);
+ Object value = allExtractors.get(name).get(m, targetUnit);
String svalue = value == null ? "" : value.toString();
out.print(escape(svalue));
}
@@ -484,13 +553,14 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
static class LegacyReporter extends FileReporter {
private final TimeUnit targetUnitOverride;
- public LegacyReporter() throws FileNotFoundException {
- super();
+ public LegacyReporter(Map<String, MetricExtractor> allExtractors) throws FileNotFoundException {
+ super(allExtractors);
targetUnitOverride = null;
}
- public LegacyReporter(String path, Map<String, String> query) throws FileNotFoundException {
- super(path, query);
+ public LegacyReporter(String path, Map<String, String> query, Map<String, MetricExtractor> allExtractors)
+ throws FileNotFoundException {
+ super(path, query, allExtractors);
if (query.containsKey("time")) {
targetUnitOverride = UNIT_MAP.get(query.get("time").toUpperCase());
if (targetUnitOverride == null) {
@@ -540,7 +610,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
options.addOption(Option.builder("r")
.longOpt("report-interval")
.hasArg()
- .argName("INTERVAL_SECS")
+ .argName("SECS")
.desc("How long in between reported metrics. Will be rounded up to the next 10 sec boundary.\n"
+ "default " + DEFAULT_REPORT_INTERVAL)
.build());
@@ -548,7 +618,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
options.addOption(Option.builder("w")
.longOpt("report-window")
.hasArg()
- .argName("INTERVAL_SECS")
+ .argName("SECS")
.desc("How long of a rolling window should be in each report. Will be rounded up to the next report interval boundary.\n"
+ "default " + DEFAULT_WINDOW_INTERVAL)
.build());
@@ -585,8 +655,14 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
private final LinkedList<Measurements> allCombined = new LinkedList<>();
- LoadMetricsServer(Map<String, Object> conf, CommandLine commandLine) throws URISyntaxException, FileNotFoundException {
+ LoadMetricsServer(Map<String, Object> conf, CommandLine commandLine, Map<String, Object> parameterMetrics) throws URISyntaxException,
+ FileNotFoundException {
super(conf);
+ Map<String, MetricExtractor> allExtractors = new LinkedHashMap<>(NAMED_EXTRACTORS);
+ for (Map.Entry<String, Object> entry: parameterMetrics.entrySet()) {
+ final Object value = entry.getValue();
+ allExtractors.put(entry.getKey(), new MetricExtractor((m, unit) -> value, ""));
+ }
if (commandLine.hasOption("r")) {
reportIntervalSecs = Long.parseLong(commandLine.getOptionValue("r"));
reportIntervalSecs = ((reportIntervalSecs + 1) / 10) * 10;
@@ -600,7 +676,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
for (String reporterString: commandLine.getOptionValues("reporter")) {
Matcher m = REPORTER_PATTERN.matcher(reporterString);
if (!m.matches()) {
- throw new IllegalArgumentException(reporterString + " does nto look like it is a reporter");
+ throw new IllegalArgumentException(reporterString + " does not look like it is a reporter");
}
String type = m.group("type");
String path = m.group("path");
@@ -617,20 +693,20 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
type = type.toUpperCase();
switch (type) {
case "LEGACY":
- reporters.add(new LegacyReporter(path, query));
+ reporters.add(new LegacyReporter(path, query, allExtractors));
break;
case "TSV":
- reporters.add(new SepValReporter("\t", path, query));
+ reporters.add(new SepValReporter("\t", path, query, allExtractors));
break;
case "CSV":
- reporters.add(new SepValReporter(",", path, query));
+ reporters.add(new SepValReporter(",", path, query, allExtractors));
break;
default:
throw new RuntimeException(type + " is not a supported reporter type");
}
}
} else {
- reporters.add(new LegacyReporter());
+ reporters.add(new LegacyReporter(allExtractors));
}
}
@@ -682,6 +758,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
if (ids.size() != names.size()) {
throw new Exception("Could not find all topologies: " + names);
}
+ HashSet<String> workers = new HashSet<>();
+ HashSet<String> hosts = new HashSet<>();
+ int executors = 0;
int uptime = 0;
long acked = 0;
long failed = 0;
@@ -689,6 +768,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
TopologyInfo info = client.getTopologyInfo(id);
uptime = Math.max(uptime, info.get_uptime_secs());
for (ExecutorSummary exec : info.get_executors()) {
+ hosts.add(exec.get_host());
+ workers.add(exec.get_host() + exec.get_port());
+ executors++;
if (exec.get_stats() != null && exec.get_stats().get_specific() != null
&& exec.get_stats().get_specific().is_set_spout()) {
SpoutStats stats = exec.get_stats().get_specific().get_spout();
@@ -729,7 +811,8 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
long gc = gcMs.getAndSet(0);
long memBytes = readMemory();
- allCombined.add(new Measurements(uptime, ackedThisTime, thisTime, failedThisTime, copy, user, sys, gc, memBytes, ids));
+ allCombined.add(new Measurements(uptime, ackedThisTime, thisTime, failedThisTime, copy, user, sys, gc, memBytes,
+ ids, workers.size(), executors, hosts.size()));
Measurements inWindow = Measurements.combine(allCombined, null, windowLength);
for (MetricResultsReporter reporter: reporters) {
reporter.reportWindow(inWindow, allCombined);
http://git-wip-us.apache.org/repos/asf/storm/blob/211f8a94/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
index 2c22b42..3d3a271 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
@@ -19,6 +19,7 @@
package org.apache.storm.loadgen;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
@@ -208,8 +209,14 @@ public class ThroughputVsLatency {
return;
}
+ Map<String, Object> metrics = new LinkedHashMap<>();
+ metrics.put("target_rate", ratePerSecond);
+ metrics.put("spout_parallel", numSpouts);
+ metrics.put("split_parallel", numSplits);
+ metrics.put("count_parallel", numCounts);
+
Config conf = new Config();
- LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd);
+ LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd, metrics);
metricServer.serve();
String url = metricServer.getUrl();