You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:50 UTC
[09/18] storm git commit: STORM-2702: Addressed the rest of the
review comments
STORM-2702: Addressed the rest of the review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e3fc5a0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e3fc5a0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e3fc5a0
Branch: refs/heads/master
Commit: 0e3fc5a097d156bad4cf17c74c147255575aa646
Parents: 6bc3213
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Aug 24 11:13:08 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Aug 24 11:13:08 2017 -0500
----------------------------------------------------------------------
examples/storm-loadgen/README.md | 21 ++--
.../apache/storm/loadgen/LoadMetricsServer.java | 116 ++++++++++++++++---
2 files changed, 116 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0e3fc5a0/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index 50a618f..1e5c69d 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -72,9 +72,12 @@ storm jar storm-loadgen.jar org.apache.storm.loadgen.ThroughputVsLatency [option
# Reporters
Reporters provide a way to store various statistics about a running topology. There are currently a few supported reporters
- * legacy - report values like ThroughputVsLatency has done in the past
- * TSV - tab separated values
- * CSV - comma separated values
+ * `legacy` - report values like ThroughputVsLatency has done in the past
+ * `tsv` - tab separated values
+ * `csv` - comma separated values
+ * `fixed` - a human readable fixed width format
+
+A `fixed` reporter to stdout will be added if no other reporters are writing to stdout or stderr.
All of these types can have their data written out to a file. To do this add a path after the type. For example `legacy:./legacy_data` or `tsv:my_run.tsv`. By default the file will be over written unless an option is given to append instead. Options are in a URL like format, with a `?` separating the type:path from the options, and all of the options separated by a `&`. To append to the file you can do something like `csv:./my_run.csv?append` or `csv:./my_run.csv?append=true`
@@ -82,10 +85,12 @@ Not all options are supported by all reporters.
|Reporter Option| Description | Supported Reporters|
|---------------|-------------|--------------------|
-|time | Set the time unit that you want latency and CPU reported in. This can be from nanoseconds up to seconds. Most names are supported for the types| legacy, csv, tsv|
-|columns | A comma separated list of columns to output (see below for the metrics supported). A `*` is replaced by all metrics. Defaults to "start_time", "end_time", "completion_rate", "mean", "99%ile", "99.9%ile", "cores", "mem", "failed" | csv, tsv |
-|extraColumns | Like columns but ones that should be added to the defaults instead of replacing them. A `*` is replaced by all metrics. | csv, tsv |
-|meta | An arbitrary string that will appear as a "meta" column at the end. This helps when appending to files to keep different runs separated | csv, tsv|
+|time | Set the time unit that you want latency and CPU reported in. This can be from nanoseconds up to seconds. Most names are supported for the types| legacy, csv, tsv, fixed|
+|columns | A comma separated list of columns to output (see below for the metrics supported). A `*` is replaced by all metrics. Defaults to "start_time", "end_time", "rate", "mean", "99%ile", "99.9%ile", "cores", "mem", "failed", "ids" | csv, tsv, fixed |
+|extraColumns | Like columns but ones that should be added to the defaults instead of replacing them. A `*` is replaced by all metrics. | csv, tsv, fixed |
+|meta | An arbitrary string that will appear as a "meta" column at the end. This helps when appending to files to keep different runs separated | csv, tsv, fixed|
+|columnWidth | The width of each field | fixed|
+|precision | The number of places after the decimal point to print out | fixed|
There are a lot of different metrics supported
@@ -106,7 +111,7 @@ There are a lot of different metrics supported
|acked| The number of tuples fully acked as reported by Storm's metrics. | all
|acked_rate| The rate of tuples fully acked as reported by Storm's metrics. | all
|completed| The number of tuples fully acked as reported by the latency histogram metrics. | all
-|completion_rate| The rate of tuples fully acked as reported by the latency histogram metrics. | all
+|rate| The rate of tuples fully acked as reported by the latency histogram metrics. | all
|mem| The amount of memory used by the topology in MB, as reported by the JVM. | all
|failed| The number of failed tuples as reported by Storm's metrics. | all
|start_time| The starting time of the metrics window from when the first topology was launched. | all
http://git-wip-us.apache.org/repos/asf/storm/blob/0e3fc5a0/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
index 69adabc..457dedf 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -19,9 +19,11 @@
package org.apache.storm.loadgen;
import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
+import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -372,7 +374,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
LinkedHashMap<String, MetricExtractor> tmp = new LinkedHashMap<>();
tmp.put("start_time", new MetricExtractor((m, unit) -> m.startTime(),"s"));
tmp.put("end_time", new MetricExtractor((m, unit) -> m.endTime(), "s"));
- tmp.put("completion_rate", new MetricExtractor((m, unit) -> m.getCompletedPerSec(), "tuple/s"));
+ tmp.put("rate", new MetricExtractor((m, unit) -> m.getCompletedPerSec(), "tuple/s"));
tmp.put("mean", new MetricExtractor((m, unit) -> m.getMeanLatency(unit)));
tmp.put("99%ile", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(99.0, unit)));
tmp.put("99.9%ile", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(99.9, unit)));
@@ -442,16 +444,14 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
}
}
- static class SepValReporter extends FileReporter {
- private final TimeUnit targetUnit;
- private final String separator;
- private final List<String> extractors;
- private final String meta;
+ abstract static class ColumnsFileReporter extends FileReporter {
+ protected final TimeUnit targetUnit;
+ protected final List<String> extractors;
+ protected final String meta;
- public SepValReporter(String separator, String path, Map<String, String> query, Map<String, MetricExtractor> extractorsMap)
+ public ColumnsFileReporter(String path, Map<String, String> query, Map<String, MetricExtractor> extractorsMap)
throws FileNotFoundException {
super(path, query, extractorsMap);
- this.separator = separator;
targetUnit = UNIT_MAP.get(query.getOrDefault("time", "MILLISECONDS").toUpperCase());
if (targetUnit == null) {
throw new IllegalArgumentException(query.get("time") + " is not a supported time unit");
@@ -467,8 +467,8 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
this.extractors = extractors;
} else {
//Wrapping it makes it mutable
- extractors = new ArrayList<>(Arrays.asList("start_time", "end_time", "completion_rate",
- "mean", "99%ile", "99.9%ile", "cores", "mem", "failed"));
+ extractors = new ArrayList<>(Arrays.asList("start_time", "end_time", "rate",
+ "mean", "99%ile", "99.9%ile", "cores", "mem", "failed", "ids"));
}
if (query.containsKey("extraColumns")) {
@@ -487,7 +487,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
meta = query.get("meta");
}
- private List<String> handleExtractorCleanup(List<String> orig) {
+ protected List<String> handleExtractorCleanup(List<String> orig) {
Map<String, Object> stormConfig = Utils.readStormConfig();
List<String> ret = new ArrayList<>(orig.size());
for (String extractor: orig) {
@@ -508,6 +508,81 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
}
return ret;
}
+ }
+
+
+ static class FixedWidthReporter extends ColumnsFileReporter {
+ public final String doubleFormat;
+ public final String longFormat;
+ public final String stringFormat;
+
+ public FixedWidthReporter(String path, Map<String, String> query, Map<String, MetricExtractor> extractorsMap)
+ throws FileNotFoundException {
+ super(path, query, extractorsMap);
+ int columnWidth = Integer.parseInt(query.getOrDefault("columnWidth", "15")) - 1;//Always have a space in between
+ int precision = Integer.parseInt(query.getOrDefault("precision", "3"));
+ doubleFormat = "%," + columnWidth + "." + precision + "f";
+ longFormat = "%," + columnWidth + "d";
+ stringFormat = "%" + columnWidth + "s";
+ }
+
+ public FixedWidthReporter(Map<String, MetricExtractor> allExtractors) throws FileNotFoundException {
+ this(null, Collections.emptyMap(), allExtractors);
+ }
+
+ private String format(Object o) {
+ if (o instanceof Double || o instanceof Float) {
+ return String.format(doubleFormat, o);
+ } else if (o instanceof Integer || o instanceof Long) {
+ return String.format(longFormat, o);
+ } else {
+ return String.format(stringFormat, o);
+ }
+ }
+
+ @Override
+ public void start() {
+ boolean first = true;
+ for (String name: extractors) {
+ if (!first) {
+ out.print(" ");
+ }
+ first = false;
+ out.print(format(allExtractors.get(name).formatName(name, targetUnit)));
+ }
+ if (meta != null) {
+ out.print(" ");
+ out.print(format("meta"));
+ }
+ out.println();
+ }
+
+ @Override
+ public void reportWindow(Measurements m, List<Measurements> allTime) {
+ boolean first = true;
+ for (String name: extractors) {
+ if (!first) {
+ out.print(" ");
+ }
+ first = false;
+ out.print(format(allExtractors.get(name).get(m, targetUnit)));
+ }
+ if (meta != null) {
+ out.print(" ");
+ out.print(format(meta));
+ }
+ out.println();
+ }
+ }
+
+ static class SepValReporter extends ColumnsFileReporter {
+ private final String separator;
+
+ public SepValReporter(String separator, String path, Map<String, String> query, Map<String, MetricExtractor> extractorsMap)
+ throws FileNotFoundException {
+ super(path, query, extractorsMap);
+ this.separator = separator;
+ }
@Override
public void start() {
@@ -628,6 +703,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
.hasArg()
.argName("TYPE:PATH?OPTIONS")
.desc("Provide the config for a reporter to run. Supported types are:\n"
+ + "FIXED - a fixed width format that should be more human readable\n"
+ "LEGACY - (write things out in the legacy format)\n"
+ "TSV - tab separated values\n"
+ "CSV - comma separated values\n"
@@ -693,6 +769,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
}
type = type.toUpperCase();
switch (type) {
+ case "FIXED":
+ reporters.add(new FixedWidthReporter(path, query, allExtractors));
+ break;
case "LEGACY":
reporters.add(new LegacyReporter(path, query, allExtractors));
break;
@@ -706,8 +785,19 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
throw new RuntimeException(type + " is not a supported reporter type");
}
}
- } else {
- reporters.add(new LegacyReporter(allExtractors));
+ }
+ boolean foundStdOutOrErr = false;
+ for (MetricResultsReporter rep : reporters) {
+ if (rep instanceof FileReporter) {
+ PrintStream ps = ((FileReporter) rep).out;
+ if (ps == System.out || ps == System.err) {
+ foundStdOutOrErr = true;
+ break;
+ }
+ }
+ }
+ if (!foundStdOutOrErr) {
+ reporters.add(new FixedWidthReporter(allExtractors));
}
}