You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:50 UTC

[09/18] storm git commit: STORM-2702: Addressed the rest of the review comments

STORM-2702: Addressed the rest of the review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e3fc5a0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e3fc5a0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e3fc5a0

Branch: refs/heads/master
Commit: 0e3fc5a097d156bad4cf17c74c147255575aa646
Parents: 6bc3213
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Aug 24 11:13:08 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Aug 24 11:13:08 2017 -0500

----------------------------------------------------------------------
 examples/storm-loadgen/README.md                |  21 ++--
 .../apache/storm/loadgen/LoadMetricsServer.java | 116 ++++++++++++++++---
 2 files changed, 116 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0e3fc5a0/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index 50a618f..1e5c69d 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -72,9 +72,12 @@ storm jar storm-loadgen.jar org.apache.storm.loadgen.ThroughputVsLatency [option
 # Reporters
 Reporters provide a way to store various statistics about a running topology. There are currently a few supported reporters
 
- * legacy - report values like ThroughputVsLatency has done in the past
- * TSV - tab separated values
- * CSV - comma separated values
+ * `legacy` - report values like ThroughputVsLatency has done in the past
+ * `tsv` - tab separated values
+ * `csv` - comma separated values
+ * `fixed` - a human readable fixed width format
+
+A `fixed` reporter to stdout will be added if no other reporters are writing to stdout or stderr.
 
 All of these types can have their data written out to a file.  To do this add a path after the type.  For example `legacy:./legacy_data` or `tsv:my_run.tsv`. By default the file will be over written unless an option is given to append instead. Options are in a URL like format, with a `?` separating the type:path from the options, and all of the options separated by a `&`.  To append to the file you can do something like `csv:./my_run.csv?append` or  `csv:./my_run.csv?append=true`
 
@@ -82,10 +85,12 @@ Not all options are supported by all reporters.
 
 |Reporter Option| Description | Supported Reporters|
 |---------------|-------------|--------------------|
-|time | Set the time unit that you want latency and CPU reported in.  This can be from nanoseconds up to seconds.  Most names are supported for the types| legacy, csv, tsv|
-|columns | A comma separated list of columns to output (see below for the metrics supported).  A `*` is replaced by all metrics. Defaults to "start_time", "end_time", "completion_rate", "mean", "99%ile", "99.9%ile", "cores", "mem", "failed" | csv, tsv |
-|extraColumns | Like columns but ones that should be added to the defaults instead of replacing them. A `*` is replaced by all metrics. | csv, tsv |
-|meta | An arbitrary string that will appear as a "meta" column at the end.  This helps when appending to files to keep different runs separated | csv, tsv|
+|time | Set the time unit that you want latency and CPU reported in.  This can be from nanoseconds up to seconds.  Most names are supported for the types| legacy, csv, tsv, fixed|
+|columns | A comma separated list of columns to output (see below for the metrics supported).  A `*` is replaced by all metrics. Defaults to "start_time", "end_time", "rate", "mean", "99%ile", "99.9%ile", "cores", "mem", "failed", "ids" | csv, tsv, fixed |
+|extraColumns | Like columns but ones that should be added to the defaults instead of replacing them. A `*` is replaced by all metrics. | csv, tsv, fixed |
+|meta | An arbitrary string that will appear as a "meta" column at the end.  This helps when appending to files to keep different runs separated | csv, tsv, fixed|
+|columnWidth | The width of each field | fixed|
+|precision | The number of places after the decimal point to print out | fixed|
 
 There are a lot of different metrics supported
 
@@ -106,7 +111,7 @@ There are a lot of different metrics supported
 |acked| The number of tuples fully acked as reported by Storm's metrics. | all
 |acked_rate| The rate of tuples fully acked as reported by Storm's metrics. | all
 |completed| The number of tuples fully acked as reported by the latency histogram metrics. | all
-|completion_rate| The rate of tuples fully acked as reported by the latency histogram metrics. | all
+|rate| The rate of tuples fully acked as reported by the latency histogram metrics. | all
 |mem| The amount of memory used by the topology in MB, as reported by the JVM. | all
 |failed| The number of failed tuples as reported by Storm's metrics. | all
 |start_time| The starting time of the metrics window from when the first topology was launched. | all

http://git-wip-us.apache.org/repos/asf/storm/blob/0e3fc5a0/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
index 69adabc..457dedf 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -19,9 +19,11 @@
 package org.apache.storm.loadgen;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
+import java.io.PrintWriter;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -372,7 +374,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         LinkedHashMap<String, MetricExtractor> tmp = new LinkedHashMap<>();
         tmp.put("start_time",  new MetricExtractor((m, unit) -> m.startTime(),"s"));
         tmp.put("end_time",  new MetricExtractor((m, unit) -> m.endTime(), "s"));
-        tmp.put("completion_rate",  new MetricExtractor((m, unit) -> m.getCompletedPerSec(), "tuple/s"));
+        tmp.put("rate",  new MetricExtractor((m, unit) -> m.getCompletedPerSec(), "tuple/s"));
         tmp.put("mean", new MetricExtractor((m, unit) -> m.getMeanLatency(unit)));
         tmp.put("99%ile", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(99.0, unit)));
         tmp.put("99.9%ile", new MetricExtractor((m, unit) -> m.getLatencyAtPercentile(99.9, unit)));
@@ -442,16 +444,14 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         }
     }
 
-    static class SepValReporter extends  FileReporter {
-        private final TimeUnit targetUnit;
-        private final String separator;
-        private final List<String> extractors;
-        private final String meta;
+    abstract static class ColumnsFileReporter extends FileReporter {
+        protected final TimeUnit targetUnit;
+        protected final List<String> extractors;
+        protected final String meta;
 
-        public SepValReporter(String separator, String path, Map<String, String> query, Map<String, MetricExtractor> extractorsMap)
+        public ColumnsFileReporter(String path, Map<String, String> query, Map<String, MetricExtractor> extractorsMap)
             throws FileNotFoundException {
             super(path, query, extractorsMap);
-            this.separator = separator;
             targetUnit = UNIT_MAP.get(query.getOrDefault("time", "MILLISECONDS").toUpperCase());
             if (targetUnit == null) {
                 throw new IllegalArgumentException(query.get("time") + " is not a supported time unit");
@@ -467,8 +467,8 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
                 this.extractors = extractors;
             } else {
                 //Wrapping it makes it mutable
-                extractors = new ArrayList<>(Arrays.asList("start_time", "end_time", "completion_rate",
-                    "mean", "99%ile", "99.9%ile", "cores", "mem", "failed"));
+                extractors = new ArrayList<>(Arrays.asList("start_time", "end_time", "rate",
+                    "mean", "99%ile", "99.9%ile", "cores", "mem", "failed", "ids"));
             }
 
             if (query.containsKey("extraColumns")) {
@@ -487,7 +487,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             meta = query.get("meta");
         }
 
-        private List<String> handleExtractorCleanup(List<String> orig) {
+        protected List<String> handleExtractorCleanup(List<String> orig) {
             Map<String, Object> stormConfig = Utils.readStormConfig();
             List<String> ret = new ArrayList<>(orig.size());
             for (String extractor: orig) {
@@ -508,6 +508,81 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             }
             return ret;
         }
+    }
+
+
+    static class FixedWidthReporter extends  ColumnsFileReporter {
+        public final String doubleFormat;
+        public final String longFormat;
+        public final String stringFormat;
+
+        public FixedWidthReporter(String path, Map<String, String> query, Map<String, MetricExtractor> extractorsMap)
+            throws FileNotFoundException {
+            super(path, query, extractorsMap);
+            int columnWidth = Integer.parseInt(query.getOrDefault("columnWidth", "15")) - 1;//Always have a space in between
+            int precision = Integer.parseInt(query.getOrDefault("precision", "3"));
+            doubleFormat = "%," + columnWidth + "." + precision + "f";
+            longFormat = "%," + columnWidth + "d";
+            stringFormat = "%" + columnWidth + "s";
+        }
+
+        public FixedWidthReporter(Map<String, MetricExtractor> allExtractors) throws FileNotFoundException {
+            this(null, Collections.emptyMap(), allExtractors);
+        }
+
+        private String format(Object o) {
+            if (o instanceof Double || o instanceof Float) {
+                return String.format(doubleFormat, o);
+            } else if (o instanceof Integer || o instanceof Long) {
+                return String.format(longFormat, o);
+            } else {
+                return String.format(stringFormat, o);
+            }
+        }
+
+        @Override
+        public void start() {
+            boolean first = true;
+            for (String name: extractors) {
+                if (!first) {
+                    out.print(" ");
+                }
+                first = false;
+                out.print(format(allExtractors.get(name).formatName(name, targetUnit)));
+            }
+            if (meta != null) {
+                out.print(" ");
+                out.print(format("meta"));
+            }
+            out.println();
+        }
+
+        @Override
+        public void reportWindow(Measurements m, List<Measurements> allTime) {
+            boolean first = true;
+            for (String name: extractors) {
+                if (!first) {
+                    out.print(" ");
+                }
+                first = false;
+                out.print(format(allExtractors.get(name).get(m, targetUnit)));
+            }
+            if (meta != null) {
+                out.print(" ");
+                out.print(format(meta));
+            }
+            out.println();
+        }
+    }
+
+    static class SepValReporter extends  ColumnsFileReporter {
+        private final String separator;
+
+        public SepValReporter(String separator, String path, Map<String, String> query, Map<String, MetricExtractor> extractorsMap)
+            throws FileNotFoundException {
+            super(path, query, extractorsMap);
+            this.separator = separator;
+        }
 
         @Override
         public void start() {
@@ -628,6 +703,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             .hasArg()
             .argName("TYPE:PATH?OPTIONS")
             .desc("Provide the config for a reporter to run.  Supported types are:\n"
+                + "FIXED - a fixed width format that should be more human readable\n"
                 + "LEGACY - (write things out in the legacy format)\n"
                 + "TSV - tab separated values\n"
                 + "CSV - comma separated values\n"
@@ -693,6 +769,9 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
                 }
                 type = type.toUpperCase();
                 switch (type) {
+                    case "FIXED":
+                        reporters.add(new FixedWidthReporter(path, query, allExtractors));
+                        break;
                     case "LEGACY":
                         reporters.add(new LegacyReporter(path, query, allExtractors));
                         break;
@@ -706,8 +785,19 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
                         throw new RuntimeException(type + " is not a supported reporter type");
                 }
             }
-        } else {
-            reporters.add(new LegacyReporter(allExtractors));
+        }
+        boolean foundStdOutOrErr = false;
+        for (MetricResultsReporter rep : reporters) {
+            if (rep instanceof FileReporter) {
+                PrintStream ps = ((FileReporter) rep).out;
+                if (ps == System.out || ps == System.err) {
+                    foundStdOutOrErr = true;
+                    break;
+                }
+            }
+        }
+        if (!foundStdOutOrErr) {
+            reporters.add(new FixedWidthReporter(allExtractors));
         }
     }