You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 19:14:47 UTC

[06/18] storm git commit: STORM-2702: Added in component specific adjustments to GenLoad

STORM-2702: Added in component specific adjustments to GenLoad


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b4cd98fa
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b4cd98fa
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b4cd98fa

Branch: refs/heads/master
Commit: b4cd98fa9f2f263aee3eaf7df53b385e62a34386
Parents: 211f8a9
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Aug 24 09:09:34 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Aug 24 09:09:34 2017 -0500

----------------------------------------------------------------------
 examples/storm-loadgen/README.md                |  6 +-
 .../java/org/apache/storm/loadgen/GenLoad.java  | 88 ++++++++++++++++----
 .../apache/storm/loadgen/LoadMetricsServer.java |  3 +-
 .../apache/storm/loadgen/TopologyLoadConf.java  | 63 ++++++++++++--
 4 files changed, 131 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b4cd98fa/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index e91409c..827fd77 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -39,11 +39,11 @@ storm jar storm-loadgen.jar org.apache.storm.loadgen.GenLoad [options] [capture_
 | --debug | Print debug information about the adjusted topology before submitting it. |
 |-h,--help | Print a help message |
 | --local-or-shuffle | Replace shuffle grouping with local or shuffle grouping. |
-| --parallel &lt;MULTIPLIER> | How much to scale the topology up or down in parallelism. The new parallelism will round up to the next whole number (defaults to 1.0 no scaling) The total throughput of the topology will not be scaled. |
+| --parallel &lt;MULTIPLIER(:TOPO:COMP)?> | How much to scale the topology up or down in parallelism. The new parallelism will round up to the next whole number. If a topology + component is supplied only that component will be scaled. If topo or component is blank or a `'*'` all topologies or components matched the other part will be scaled. Only 1 scaling rule, the most specific, will be applied to a component. Providing a topology name is considered more specific than not providing one. (defaults to 1.0 no scaling) |
 | -r,--report-interval &lt;INTERVAL_SECS> | How long in between reported metrics.  Will be rounded up to the next 10 sec boundary. default 30 |
 | --reporter &lt;TYPE:FILE?OPTIONS>  | Provide the config for a reporter to run. See below for more information about these |
 | -t,--test-time &lt;MINS> | How long to run the tests for in mins (defaults to 5) |
-| --throughput &lt;MULTIPLIER> | How much to scale the topology up or down in throughput. (defaults to 1.0 no scaling)|
+| --throughput &lt;MULTIPLIER(:TOPO:COMP)?> | How much to scale the topology up or down in throughput. If a topology + component is supplied only that component will be scaled. If topo or component is blank or a `'*'` all topologies or components matched will be scaled. Only 1 scaling rule, the most specific, will be applied to a component. Providing a topology name is considered more specific than not providing one.(defaults to 1.0 no scaling)|
 | -w,--report-window &lt;INTERVAL_SECS> | How long of a rolling window should be in each report.  Will be rounded up to the next report interval boundary. default 30|
 
 ## ThroughputVsLatency
@@ -125,7 +125,9 @@ There are a lot of different metrics supported
 |split_parallel| The parallelism of the split bolt for the `ThroughputVsLatency` topology. | ThroughputVsLatency
 |count_parallel| The parallelism of the count bolt for the `ThroughputVsLatency` topology. | ThroughputVsLatency
 |parallel\_adjust| The adjustment to the parallelism in `GenLoad`. | GenLoad
+|topo_parallel| A list of topology/component specfic adjustment rules to the parallelism in `GenLoad`. | GenLoad
 |throughput_adjust| The adjustment to the throughput in `GenLoad`. | GenLoad
+|topo_throughput| A list of topology/component specfic adjustment rules to the throughput in `GenLoad`. | GenLoad
 |local\_or\_shuffle| true if shuffles were replaced with local or shuffle in GenLoad. | GenLoad
 
 There are also some generic rules that you can use for some metrics.  Any metric that starts with `"conf:"` will be the config for that.  It does not include config overrides from the `GenLoad` file.

http://git-wip-us.apache.org/repos/asf/storm/blob/b4cd98fa/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
index 8821b2a..95ba8dd 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
@@ -23,6 +23,9 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
@@ -48,6 +51,8 @@ import org.slf4j.LoggerFactory;
 public class GenLoad {
     private static final Logger LOG = LoggerFactory.getLogger(GenLoad.class);
     private static final int TEST_EXECUTE_TIME_DEFAULT = 5;
+    private static final Pattern MULTI_PATTERN = Pattern.compile(
+        "(?<value>[^:?]+)(?::(?<topo>[^:]*):(?<comp>.*))?");
 
     /**
      * Main entry point for GenLoad application.
@@ -68,18 +73,25 @@ public class GenLoad {
             .build());
         options.addOption(Option.builder()
             .longOpt("parallel")
-            .argName("MULTIPLIER")
+            .argName("MULTIPLIER(:TOPO:COMP)?")
             .hasArg()
-            .desc("How much to scale the topology up or down in parallelism.\n"
-                + "The new parallelism will round up to the next whole number\n"
+            .desc("How much to scale the topology up or down in parallelism. "
+                + "The new parallelism will round up to the next whole number. "
+                + "If a topology + component is supplied only that component will be scaled. "
+                + "If topo or component is blank or a '*' all topologies or components matched will be scaled. "
+                + "Only 1 scaling rule, the most specific, will be applied to a component. Providing a topology name is considered more "
+                + "specific than not providing one."
                 + "(defaults to 1.0 no scaling)")
             .build());
         options.addOption(Option.builder()
             .longOpt("throughput")
-            .argName("MULTIPLIER")
+            .argName("MULTIPLIER(:TOPO:COMP)?")
             .hasArg()
-            .desc("How much to scale the topology up or down in throughput.\n"
-                + "Note this is applied after and build on any parallelism changes.\n"
+            .desc("How much to scale the topology up or down in throughput. "
+                + "If a topology + component is supplied only that component will be scaled. "
+                + "If topo or component is blank or a '*' all topologies or components matched will be scaled. "
+                + "Only 1 scaling rule, the most specific, will be applied to a component. Providing a topology name is considered more "
+                + "specific than not providing one."
                 + "(defaults to 1.0 no scaling)")
             .build());
         options.addOption(Option.builder()
@@ -95,18 +107,58 @@ public class GenLoad {
         CommandLine cmd = null;
         Exception commandLineException = null;
         double executeTime = TEST_EXECUTE_TIME_DEFAULT;
-        double parallel = 1.0;
-        double throughput = 1.0;
+        double globalParallel = 1.0;
+        Map<String, Double> topoSpecificParallel = new HashMap<>();
+        double globalThroughput = 1.0;
+        Map<String, Double> topoSpecificThroughput = new HashMap<>();
         try {
             cmd = parser.parse(options, args);
             if (cmd.hasOption("t")) {
                 executeTime = Double.valueOf(cmd.getOptionValue("t"));
             }
             if (cmd.hasOption("parallel")) {
-                parallel = Double.parseDouble(cmd.getOptionValue("parallel"));
+                for (String stringParallel : cmd.getOptionValues("parallel")) {
+                    Matcher m = MULTI_PATTERN.matcher(stringParallel);
+                    if (!m.matches()) {
+                        throw new ParseException("--parallel " + stringParallel + " is not in the format MULTIPLIER(:TOPO:COMP)?");
+                    }
+                    double parallel = Double.parseDouble(m.group("value"));
+                    String topo = m.group("topo");
+                    if (topo == null || topo.isEmpty()) {
+                        topo = "*";
+                    }
+                    String comp = m.group("comp");
+                    if (comp == null || comp.isEmpty()) {
+                        comp = "*";
+                    }
+                    if ("*".equals(topo) && "*".equals(comp)) {
+                        globalParallel = parallel;
+                    } else {
+                        topoSpecificParallel.put(topo + ":" + comp, parallel);
+                    }
+                }
             }
             if (cmd.hasOption("throughput")) {
-                throughput = Double.parseDouble(cmd.getOptionValue("throughput"));
+                for (String stringThroughput : cmd.getOptionValues("throughput")) {
+                    Matcher m = MULTI_PATTERN.matcher(stringThroughput);
+                    if (!m.matches()) {
+                        throw new ParseException("--throughput " + stringThroughput + " is not in the format MULTIPLIER(:TOPO:COMP)?");
+                    }
+                    double throughput = Double.parseDouble(m.group("value"));
+                    String topo = m.group("topo");
+                    if (topo == null || topo.isEmpty()) {
+                        topo = "*";
+                    }
+                    String comp = m.group("comp");
+                    if (comp == null || comp.isEmpty()) {
+                        comp = "*";
+                    }
+                    if ("*".equals(topo) && "*".equals(comp)) {
+                        globalThroughput = throughput;
+                    } else {
+                        topoSpecificThroughput.put(topo + ":" + comp, throughput);
+                    }
+                }
             }
         } catch (ParseException | NumberFormatException e) {
             commandLineException = e;
@@ -119,9 +171,13 @@ public class GenLoad {
             return;
         }
         Map<String, Object> metrics = new LinkedHashMap<>();
-        metrics.put("parallel_adjust", parallel);
-        metrics.put("throughput_adjust", throughput);
+        metrics.put("parallel_adjust", globalParallel);
+        metrics.put("throughput_adjust", globalThroughput);
         metrics.put("local_or_shuffle", cmd.hasOption("local-or-shuffle"));
+        metrics.put("topo_parallel", topoSpecificParallel.entrySet().stream().map((entry) -> entry.getValue() + ":" + entry.getKey())
+            .collect(Collectors.toList()));
+        metrics.put("topo_throuhgput", topoSpecificThroughput.entrySet().stream().map((entry) -> entry.getValue() + ":" + entry.getKey())
+            .collect(Collectors.toList()));
 
         Config conf = new Config();
         LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd, metrics);
@@ -134,12 +190,8 @@ public class GenLoad {
             for (String topoFile : cmd.getArgList()) {
                 try {
                     TopologyLoadConf tlc = readTopology(topoFile);
-                    if (parallel != 1.0) {
-                        tlc = tlc.scaleParallel(parallel);
-                    }
-                    if (throughput != 1.0) {
-                        tlc = tlc.scaleThroughput(throughput);
-                    }
+                    tlc = tlc.scaleParallel(globalParallel, topoSpecificParallel);
+                    tlc = tlc.scaleThroughput(globalThroughput, topoSpecificThroughput);
                     if (cmd.hasOption("local-or-shuffle")) {
                         tlc = tlc.replaceShuffleWithLocalOrShuffle();
                     }

http://git-wip-us.apache.org/repos/asf/storm/blob/b4cd98fa/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
index fd12247..69adabc 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -630,7 +630,8 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
             .desc("Provide the config for a reporter to run.  Supported types are:\n"
                 + "LEGACY - (write things out in the legacy format)\n"
                 + "TSV - tab separated values\n"
-                + "CSV - comma separated values")
+                + "CSV - comma separated values\n"
+                + "PATH and OPTIONS are each optional but must be marked with a ':' or '?' separator respectively.")
             .build());
 
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/b4cd98fa/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
index 1a45ccc..38458c6 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
@@ -228,13 +228,57 @@ public class TopologyLoadConf {
     }
 
     /**
+     * The first one that is not null
+     * @param rest all the other somethings
+     * @param <V> whatever type you want.
+     * @return the first one that is not null
+     */
+    static <V> V or(V...rest) {
+        for (V i: rest) {
+            if (i != null) {
+                return i;
+            }
+        }
+        return null;
+    }
+
+    LoadCompConf scaleCompParallel(LoadCompConf comp, double v, Map<String, Double> topoSpecificParallel) {
+        LoadCompConf ret = comp;
+        double scale = or(topoSpecificParallel.get(name + ":" + comp.id),
+            topoSpecificParallel.get(name + ":*"),
+            topoSpecificParallel.get("*:" + comp.id),
+            v);
+        if (scale != 1.0) {
+            ret = ret.scaleParallel(scale);
+        }
+        return ret;
+    }
+
+    LoadCompConf scaleCompThroughput(LoadCompConf comp, double v, Map<String, Double> topoSpecificParallel) {
+        LoadCompConf ret = comp;
+        double scale = or(topoSpecificParallel.get(name + ":" + comp.id),
+            topoSpecificParallel.get(name + ":*"),
+            topoSpecificParallel.get("*:" + comp.id),
+            v);
+        if (scale != 1.0) {
+            ret = ret.scaleThroughput(scale);
+        }
+        return ret;
+    }
+
+    /**
      * Scale all of the components in the topology by a percentage (but keep the throughput the same).
      * @param v the amount to scale them by.  1.0 is nothing, 0.5 cuts them in half, 2.0 doubles them.
      * @return a copy of this with the needed adjustments made.
      */
-    public TopologyLoadConf scaleParallel(double v) {
-        List<LoadCompConf> scaledSpouts = spouts.stream().map((c) -> c.scaleParallel(v)).collect(Collectors.toList());
-        List<LoadCompConf> scaledBolts = bolts.stream().map((c) -> c.scaleParallel(v)).collect(Collectors.toList());
+    public TopologyLoadConf scaleParallel(double v, Map<String, Double> topoSpecific) {
+        if (v == 1.0 && (topoSpecific == null || topoSpecific.isEmpty())) {
+            return this;
+        }
+        List<LoadCompConf> scaledSpouts = spouts.stream().map((s) -> scaleCompParallel(s, v, topoSpecific))
+            .collect(Collectors.toList());
+        List<LoadCompConf> scaledBolts = bolts.stream().map((s) -> scaleCompParallel(s, v, topoSpecific))
+            .collect(Collectors.toList());
         return new TopologyLoadConf(name, topoConf, scaledSpouts, scaledBolts, streams);
     }
 
@@ -243,11 +287,14 @@ public class TopologyLoadConf {
      * @param v the amount to scale it by 1.0 is nothing 0.5 cuts it in half and 2.0 doubles it.
      * @return a copy of this with the needed adjustments made.
      */
-    public TopologyLoadConf scaleThroughput(double v) {
-        List<LoadCompConf> scaledSpouts = spouts.stream()
-            .map((c) -> c.scaleThroughput(v)).collect(Collectors.toList());
-        List<LoadCompConf> scaledBolts = bolts.stream()
-            .map((c) -> c.scaleThroughput(v)).collect(Collectors.toList());
+    public TopologyLoadConf scaleThroughput(double v, Map<String, Double> topoSpecific) {
+        if (v == 1.0 && (topoSpecific == null || topoSpecific.isEmpty())) {
+            return this;
+        }
+        List<LoadCompConf> scaledSpouts = spouts.stream().map((s) -> scaleCompThroughput(s, v, topoSpecific))
+            .collect(Collectors.toList());
+        List<LoadCompConf> scaledBolts = bolts.stream().map((s) -> scaleCompThroughput(s, v, topoSpecific))
+            .collect(Collectors.toList());
         return new TopologyLoadConf(name, topoConf, scaledSpouts, scaledBolts, streams);
     }