You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/14 15:32:15 UTC
[1/2] storm git commit: STORM-2733: Better load aware shuffle
implementation
Repository: storm
Updated Branches:
refs/heads/master 79010512d -> 124acb92d
STORM-2733: Better load aware shuffle implementation
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9f1d7ef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9f1d7ef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9f1d7ef
Branch: refs/heads/master
Commit: b9f1d7efd96cf2fb1393f7a2e4277e3fb68b21bc
Parents: 6d23b8b
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Aug 24 11:13:08 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 11 15:38:03 2017 -0500
----------------------------------------------------------------------
examples/storm-loadgen/README.md | 5 +
examples/storm-loadgen/pom.xml | 2 +-
.../loadgen/ExecAndProcessLatencyEngine.java | 119 ++++++++++++++
.../java/org/apache/storm/loadgen/GenLoad.java | 25 +++
.../java/org/apache/storm/loadgen/LoadBolt.java | 63 +------
.../org/apache/storm/loadgen/LoadCompConf.java | 43 ++++-
.../apache/storm/loadgen/LoadMetricsServer.java | 4 +-
.../apache/storm/loadgen/NormalDistStats.java | 2 +-
.../org/apache/storm/loadgen/OutputStream.java | 2 +-
.../storm/loadgen/SlowExecutorPattern.java | 83 ++++++++++
.../storm/loadgen/ThroughputVsLatency.java | 47 +++++-
.../apache/storm/loadgen/TopologyLoadConf.java | 27 ++-
.../src/jvm/org/apache/storm/daemon/Task.java | 2 -
.../org/apache/storm/daemon/worker/Worker.java | 2 +-
.../apache/storm/daemon/worker/WorkerState.java | 22 ++-
.../jvm/org/apache/storm/executor/Executor.java | 10 +-
.../apache/storm/executor/ExecutorShutdown.java | 18 +-
.../apache/storm/executor/IRunningExecutor.java | 3 +
.../storm/executor/bolt/BoltExecutor.java | 4 +-
.../storm/executor/spout/SpoutExecutor.java | 4 +-
.../grouping/LoadAwareShuffleGrouping.java | 129 +++++++--------
.../grouping/LoadAwareShuffleGroupingTest.java | 163 +++++++------------
22 files changed, 512 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index 5a6a030..88ad5f9 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -47,6 +47,7 @@ storm jar storm-loadgen.jar org.apache.storm.loadgen.GenLoad [options] [capture_
| -t,--test-time <MINS> | How long to run the tests for in mins (defaults to 5) |
| --throughput <MULTIPLIER(:TOPO:COMP)?> | How much to scale the topology up or down in throughput. If a topology + component is supplied only that component will be scaled. If topo or component is blank or a `'*'` all topologies or components matched will be scaled. Only 1 scaling rule, the most specific, will be applied to a component. Providing a topology name is considered more specific than not providing one.(defaults to 1.0 no scaling)|
| -w,--report-window <INTERVAL_SECS> | How long of a rolling window should be in each report. Will be rounded up to the next report interval boundary. default 30|
+| --imbalance <MS(:COUNT)?:TOPO:COMP> | The number of ms that the first COUNT of TOPO:COMP will wait before processing. This creates an imbalance that helps test load aware groupings. By default there is no imbalance unless specificed by the captrue file. |
## ThroughputVsLatency
A word count topology with metrics reporting like the `GenLoad` command.
@@ -68,6 +69,7 @@ storm jar storm-loadgen.jar org.apache.storm.loadgen.ThroughputVsLatency [option
| --spouts <NUM>| Number of spouts to use (defaults to 1) |
| -t,--test-time <MINS>| How long to run the tests for in mins (defaults to 5) |
| -w,--report-window <INTERVAL_SECS>| How long of a rolling window should be in each report. Will be rounded up to the next report interval boundary.|
+| --splitter-imbalance <MS(:COUNT)?> | The number of ms that the first COUNT splitters will wait before processing. This creates an imbalance that helps test load aware groupings (defaults to 0:1)|
# Reporters
Reporters provide a way to store various statistics about a running topology. There are currently a few supported reporters
@@ -140,6 +142,7 @@ There are a lot of different metrics supported
|throughput_adjust| The adjustment to the throughput in `GenLoad`. | GenLoad
|topo_throughput| A list of topology/component specific adjustment rules to the throughput in `GenLoad`. | GenLoad
|local\_or\_shuffle| true if shuffles were replaced with local or shuffle in GenLoad. | GenLoad
+|slow\_execs| A list of topology/component specific adjustment rules to the slowExecutorPattern in `GenLoad`. | GenLoad
There are also some generic rules that you can use for some metrics. Any metric that starts with `"conf:"` will be the config for that. It does not include config overrides from the `GenLoad` file.
@@ -170,6 +173,8 @@ Spouts and bolts have the same format.
| streams | The streams that are produced by this bolt or spout |
| cpuLoad | The number of cores this component needs for resource aware scheduling |
| memoryLoad | The amount of memory in MB that this component needs for resource aware scheduling |
+| slowExecutorPattern.slownessMs | an Optional number of ms to slow down the exec + process latency for some of this component (defaults to 0) |
+| slowExecutorPattern.count | the number of components to slow down (defaults to 1) |
### Output Streams
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/pom.xml b/examples/storm-loadgen/pom.xml
index aa43638..e79e14c 100644
--- a/examples/storm-loadgen/pom.xml
+++ b/examples/storm-loadgen/pom.xml
@@ -114,7 +114,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>3</maxAllowedViolations>
+ <maxAllowedViolations>0</maxAllowedViolations>
</configuration>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ExecAndProcessLatencyEngine.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ExecAndProcessLatencyEngine.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ExecAndProcessLatencyEngine.java
new file mode 100644
index 0000000..2f5257d
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ExecAndProcessLatencyEngine.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.Serializable;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * A more accurate sleep implementation.
+ */
+public class ExecAndProcessLatencyEngine implements Serializable {
+ private static final long NANO_IN_MS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
+ private final SlowExecutorPattern skewedPattern;
+
+ public static long toNano(double ms) {
+ return (long)(ms * NANO_IN_MS);
+ }
+
+ private final AtomicLong parkOffset = new AtomicLong(0);
+ private Random rand;
+ private ScheduledExecutorService timer;
+
+ public ExecAndProcessLatencyEngine() {
+ this(null);
+ }
+
+ public ExecAndProcessLatencyEngine(SlowExecutorPattern skewedPattern) {
+ this.skewedPattern = skewedPattern;
+ }
+
+ public void prepare() {
+ this.rand = ThreadLocalRandom.current();
+ this.timer = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ /**
+ * Sleep for a set number of nano seconds.
+ * @param start the start time of the sleep
+ * @param sleepAmount how many nano seconds after start when we should stop.
+ */
+ public void sleepNano(long start, long sleepAmount) {
+ long endTime = start + sleepAmount;
+ // A small control algorithm to adjust the amount of time that we sleep to make it more accurate
+ long newEnd = endTime - parkOffset.get();
+ long diff = newEnd - start;
+ //There are some different levels of accuracy here, and we want to deal with all of them
+ if (diff <= 1_000) {
+ //We are done, nothing that short is going to work here
+ } else if (diff < NANO_IN_MS) {
+ //Busy wait...
+ long sum = 0;
+ while (System.nanoTime() < newEnd) {
+ for (long i = 0; i < 1_000_000; i++) {
+ sum += i;
+ }
+ }
+ } else {
+ //More accurate that thread.sleep, but still not great
+ LockSupport.parkNanos(newEnd - System.nanoTime());
+ }
+ parkOffset.addAndGet((System.nanoTime() - endTime) / 2);
+ }
+
+ public void sleepNano(long nano) {
+ sleepNano(System.nanoTime(), nano);
+ }
+
+ public void sleepUntilNano(long endTime) {
+ long start = System.nanoTime();
+ sleepNano(start, endTime - start);
+ }
+
+ /**
+ * Simulate both process and exec times.
+ * @param executorIndex the index of this executor. It is used to skew the latencies.
+ * @param startTimeNs when the executor started in nano-seconds.
+ * @param in the metrics for the input stream (or null if you don't want to use them).
+ * @param r what to run when the process latency is up. Note that this may run on a separate thread after this method call has
+ * completed.
+ */
+ public void simulateProcessAndExecTime(int executorIndex, long startTimeNs, InputStream in, Runnable r) {
+ long extraTimeNs = skewedPattern == null ? 0 : toNano(skewedPattern.getExtraSlowness(executorIndex));
+ long endExecNs = startTimeNs + extraTimeNs + (in == null ? 0 : ExecAndProcessLatencyEngine.toNano(in.execTime.nextRandom(rand)));
+ long endProcNs = startTimeNs + extraTimeNs + (in == null ? 0 : ExecAndProcessLatencyEngine.toNano(in.processTime.nextRandom(rand)));
+
+ if ((endProcNs - 1_000_000) < endExecNs) {
+ sleepUntilNano(endProcNs);
+ r.run();
+ } else {
+ timer.schedule(() -> {
+ r.run();
+ }, Math.max(0, endProcNs - System.nanoTime()), TimeUnit.NANOSECONDS);
+ }
+
+ sleepUntilNano(endExecNs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
index f535b32..141f11a 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
@@ -100,6 +100,13 @@ public class GenLoad {
.desc("replace shuffle grouping with local or shuffle grouping")
.build());
options.addOption(Option.builder()
+ .longOpt("imbalance")
+ .argName("MS(:COUNT)?:TOPO:COMP")
+ .hasArg()
+ .desc("The number of ms that the first COUNT of TOPO:COMP will wait before processing. This creates an imbalance "
+ + "that helps test load aware groupings. By default there is no imbalance. If no count is given it defaults to 1")
+ .build());
+ options.addOption(Option.builder()
.longOpt("debug")
.desc("Print debug information about the adjusted topology before submitting it.")
.build());
@@ -112,6 +119,7 @@ public class GenLoad {
Map<String, Double> topoSpecificParallel = new HashMap<>();
double globalThroughput = 1.0;
Map<String, Double> topoSpecificThroughput = new HashMap<>();
+ Map<String, SlowExecutorPattern> topoSpecificImbalance = new HashMap<>();
try {
cmd = parser.parse(options, args);
if (cmd.hasOption("t")) {
@@ -161,6 +169,20 @@ public class GenLoad {
}
}
}
+ if (cmd.hasOption("imbalance")) {
+ for (String stringImbalance : cmd.getOptionValues("imbalance")) {
+ //We require there to be both a topology and a component in this case, so parse it out as such.
+ String [] parts = stringImbalance.split(":");
+ if (parts.length < 3 || parts.length > 4) {
+ throw new ParseException(stringImbalance + " does not appear to match the expected pattern");
+ } else if (parts.length == 3) {
+ topoSpecificImbalance.put(parts[1] + ":" + parts[2], SlowExecutorPattern.fromString(parts[0]));
+ } else { //== 4
+ topoSpecificImbalance.put(parts[2] + ":" + parts[3],
+ SlowExecutorPattern.fromString(parts[0] + ":" + parts[1]));
+ }
+ }
+ }
} catch (ParseException | NumberFormatException e) {
commandLineException = e;
}
@@ -179,6 +201,8 @@ public class GenLoad {
.collect(Collectors.toList()));
metrics.put("topo_throuhgput", topoSpecificThroughput.entrySet().stream().map((entry) -> entry.getValue() + ":" + entry.getKey())
.collect(Collectors.toList()));
+ metrics.put("slow_execs", topoSpecificImbalance.entrySet().stream().map((entry) -> entry.getValue() + ":" + entry.getKey())
+ .collect(Collectors.toList()));
Config conf = new Config();
LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd, metrics);
@@ -193,6 +217,7 @@ public class GenLoad {
TopologyLoadConf tlc = readTopology(topoFile);
tlc = tlc.scaleParallel(globalParallel, topoSpecificParallel);
tlc = tlc.scaleThroughput(globalThroughput, topoSpecificThroughput);
+ tlc = tlc.overrideSlowExecs(topoSpecificImbalance);
if (cmd.hasOption("local-or-shuffle")) {
tlc = tlc.replaceShuffleWithLocalOrShuffle();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
index e02e8f8..28b611f 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
@@ -28,8 +28,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.OutputCollector;
@@ -47,20 +45,16 @@ import org.slf4j.LoggerFactory;
*/
public class LoadBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(LoadBolt.class);
- private static final long NANO_IN_MS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
private final List<OutputStream> outputStreamStats;
private List<OutputStreamEngine> outputStreams;
private final Map<GlobalStreamId, InputStream> inputStreams = new HashMap<>();
private OutputCollector collector;
- private Random rand;
- private ScheduledExecutorService timer;
-
- private static long toNano(double ms) {
- return (long)(ms * NANO_IN_MS);
- }
+ private final ExecAndProcessLatencyEngine sleep;
+ private int executorIndex;
public LoadBolt(LoadCompConf conf) {
this.outputStreamStats = Collections.unmodifiableList(new ArrayList<>(conf.streams));
+ sleep = new ExecAndProcessLatencyEngine(conf.slp);
}
public void add(InputStream inputStream) {
@@ -73,33 +67,8 @@ public class LoadBolt extends BaseRichBolt {
outputStreams = Collections.unmodifiableList(outputStreamStats.stream()
.map((ss) -> new OutputStreamEngine(ss)).collect(Collectors.toList()));
this.collector = collector;
- this.rand = ThreadLocalRandom.current();
- this.timer = Executors.newSingleThreadScheduledExecutor();
- }
-
- private final AtomicLong parkOffset = new AtomicLong(0);
-
- private void mySleep(long endTime) {
- //There are some different levels of accuracy here, and we want to deal with all of them
- long start = System.nanoTime();
- long newEnd = endTime - parkOffset.get();
- long diff = newEnd - start;
- if (diff <= 1_000) {
- //We are done, nothing that short is going to work here
- } else if (diff < NANO_IN_MS) {
- //Busy wait...
- long sum = 0;
- while (System.nanoTime() < newEnd) {
- for (long i = 0; i < 1_000_000; i++) {
- sum += i;
- }
- }
- } else {
- //More accurate that thread.sleep, but still not great
- LockSupport.parkNanos(newEnd - System.nanoTime() - parkOffset.get());
- // A small control algorithm to adjust the amount of time that we sleep to make it more accurate
- }
- parkOffset.addAndGet((System.nanoTime() - endTime) / 2);
+ executorIndex = context.getThisTaskIndex();
+ sleep.prepare();
}
private void emitTuples(Tuple input) {
@@ -114,27 +83,11 @@ public class LoadBolt extends BaseRichBolt {
@Override
public void execute(final Tuple input) {
long startTimeNs = System.nanoTime();
- InputStream in = inputStreams.get(input.getSourceGlobalStreamId());
- if (in == null) {
+ InputStream in = inputStreams.get(input.getSourceGlobalStreamid());
+ sleep.simulateProcessAndExecTime(executorIndex, startTimeNs, in, () -> {
emitTuples(input);
collector.ack(input);
- } else {
- long endExecNs = startTimeNs + toNano(in.execTime.nextRandom(rand));
- long endProcNs = startTimeNs + toNano(in.processTime.nextRandom(rand));
-
- if ((endProcNs - 1_000_000) < endExecNs) {
- mySleep(endProcNs);
- emitTuples(input);
- collector.ack(input);
- } else {
- timer.schedule(() -> {
- emitTuples(input);
- collector.ack(input);
- }, Math.max(0, endProcNs - System.nanoTime()), TimeUnit.NANOSECONDS);
- }
-
- mySleep(endExecNs);
- }
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
index 6548cc8..80f4faf 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
@@ -35,6 +35,7 @@ public class LoadCompConf {
public final List<OutputStream> streams;
public final double cpuLoad;
public final double memoryLoad;
+ public final SlowExecutorPattern slp;
/**
* Parse the LoadCompConf from a config Map.
@@ -54,7 +55,11 @@ public class LoadCompConf {
double memoryMb = ObjectReader.getDouble(conf.get("memoryLoad"), 0.0);
double cpuPercent = ObjectReader.getDouble(conf.get("cpuLoad"), 0.0);
- return new LoadCompConf(id, parallelism, streams, memoryMb, cpuPercent);
+ SlowExecutorPattern slp = null;
+ if (conf.containsKey("slowExecutorPattern")) {
+ slp = SlowExecutorPattern.fromConf((Map<String, Object>) conf.get("slowExecutorPattern"));
+ }
+ return new LoadCompConf(id, parallelism, streams, memoryMb, cpuPercent, slp);
}
/**
@@ -79,6 +84,9 @@ public class LoadCompConf {
}
ret.put("streams", streamData);
}
+ if (slp != null) {
+ ret.put("slowExecutorPattern", slp.toConf());
+ }
return ret;
}
@@ -95,7 +103,7 @@ public class LoadCompConf {
.map((orig) -> orig.remap(id, remappedStreams))
.collect(Collectors.toList());
- return new LoadCompConf(remappedId, parallelism, remappedOutStreams, cpuLoad, memoryLoad);
+ return new LoadCompConf(remappedId, parallelism, remappedOutStreams, cpuLoad, memoryLoad, slp);
}
/**
@@ -116,18 +124,31 @@ public class LoadCompConf {
public LoadCompConf setParallel(int newParallelism) {
//We need to adjust the throughput accordingly (so that it stays the same in aggregate)
double throughputAdjustment = ((double)parallelism) / newParallelism;
- return new LoadCompConf(id, newParallelism, streams, cpuLoad, memoryLoad).scaleThroughput(throughputAdjustment);
+ return new LoadCompConf(id, newParallelism, streams, cpuLoad, memoryLoad, slp).scaleThroughput(throughputAdjustment);
}
/**
* Scale the throughput of this component.
* @param v 1.0 is unchanged 0.5 will cut the throughput in half.
- * @return a copu of this with the adjustments made.
+ * @return a copy of this with the adjustments made.
*/
public LoadCompConf scaleThroughput(double v) {
if (streams != null) {
List<OutputStream> newStreams = streams.stream().map((s) -> s.scaleThroughput(v)).collect(Collectors.toList());
- return new LoadCompConf(id, parallelism, newStreams, cpuLoad, memoryLoad);
+ return new LoadCompConf(id, parallelism, newStreams, cpuLoad, memoryLoad, slp);
+ } else {
+ return this;
+ }
+ }
+
+ /**
+ * Override the SlowExecutorPattern with a new one.
+ * @param slp the new pattern or null if you don't want it to change
+ * @return a copy of this with the adjustments made.
+ */
+ public LoadCompConf overrideSlowExecutorPattern(SlowExecutorPattern slp) {
+ if (slp != null) {
+ return new LoadCompConf(id, parallelism, streams, cpuLoad, memoryLoad, slp);
} else {
return this;
}
@@ -155,6 +176,7 @@ public class LoadCompConf {
private List<OutputStream> streams;
private double cpuLoad = 0.0;
private double memoryLoad = 0.0;
+ private SlowExecutorPattern slp = null;
public String getId() {
return id;
@@ -206,8 +228,13 @@ public class LoadCompConf {
return this;
}
+ public Builder withSlowExecutorPattern(SlowExecutorPattern slp) {
+ this.slp = slp;
+ return this;
+ }
+
public LoadCompConf build() {
- return new LoadCompConf(id, parallelism, streams, cpuLoad, memoryLoad);
+ return new LoadCompConf(id, parallelism, streams, cpuLoad, memoryLoad, slp);
}
}
@@ -217,7 +244,8 @@ public class LoadCompConf {
* @param parallelism tha parallelism of the component.
* @param streams the output streams of the component.
*/
- public LoadCompConf(String id, int parallelism, List<OutputStream> streams, double cpuLoad, double memoryLoad) {
+ public LoadCompConf(String id, int parallelism, List<OutputStream> streams, double cpuLoad, double memoryLoad,
+ SlowExecutorPattern slp) {
this.id = id;
if (id == null) {
throw new IllegalArgumentException("A spout ID cannot be null");
@@ -226,5 +254,6 @@ public class LoadCompConf {
this.streams = streams;
this.cpuLoad = cpuLoad;
this.memoryLoad = memoryLoad;
+ this.slp = slp;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
index 10ec698..33b4def 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -586,7 +586,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
}
- static class FixedWidthReporter extends ColumnsFileReporter {
+ static class FixedWidthReporter extends ColumnsFileReporter {
public final String longFormat;
public final String stringFormat;
@@ -649,7 +649,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
}
}
- static class SepValReporter extends ColumnsFileReporter {
+ static class SepValReporter extends ColumnsFileReporter {
private final String separator;
public SepValReporter(String separator, String path, Map<String, String> query, Map<String, MetricExtractor> extractorsMap)
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
index 20b2926..8edf660 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
@@ -123,7 +123,7 @@ public class NormalDistStats implements Serializable {
}
/**
- * Generate a random number that follows the statistical distribution
+ * Generate a random number that follows the statistical distribution.
* @param rand the random number generator to use
* @return the next number that should follow the statistical distribution.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
index 99357d9..29566b1 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
@@ -105,7 +105,7 @@ public class OutputStream implements Serializable {
}
/**
- * Create a new stream with stats
+ * Create a new stream with stats.
* @param id the id of the stream
* @param rate the rate of tuples being emitted on this stream
* @param areKeysSkewed true if keys are skewed else false. For skewed keys
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/SlowExecutorPattern.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/SlowExecutorPattern.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/SlowExecutorPattern.java
new file mode 100644
index 0000000..d2c3ac5
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/SlowExecutorPattern.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.storm.utils.ObjectReader;
+
+/**
+ * A repeating pattern of skewedness in processing times. This is used to simulate an executor that slows down.
+ */
+public class SlowExecutorPattern implements Serializable {
+ private static final Pattern PARSER = Pattern.compile("\\s*(?<slowness>[^:]+)\\s*(?::\\s*(?<count>[0-9]+))?\\s*");
+ public final double maxSlownessMs;
+ public final int count;
+
+ /**
+ * Parses a string (command line) representation of "<SLOWNESS>(:<COUNT>)?".
+ * @param strRepresentation the string representation to parse
+ * @return the corresponding SlowExecutorPattern.
+ */
+ public static SlowExecutorPattern fromString(String strRepresentation) {
+ Matcher m = PARSER.matcher(strRepresentation);
+ if (!m.matches()) {
+ throw new IllegalArgumentException(strRepresentation + " is not in the form <SLOWNESS>(:<COUNT>)?");
+ }
+ double slownessMs = Double.valueOf(m.group("slowness"));
+ String c = m.group("count");
+ int count = c == null ? 1 : Integer.valueOf(c);
+ return new SlowExecutorPattern(slownessMs, count);
+ }
+
+ /**
+ * Creates a SlowExecutorPattern from a Map config.
+ * @param conf the conf to parse.
+ * @return the corresponding SlowExecutorPattern.
+ */
+ public static SlowExecutorPattern fromConf(Map<String, Object> conf) {
+ double slowness = ObjectReader.getDouble(conf.get("slownessMs"), 0.0);
+ int count = ObjectReader.getInt(conf.get("count"), 1);
+ return new SlowExecutorPattern(slowness, count);
+ }
+
+ /**
+ * Convert this to a Config map.
+ * @return the corresponding Config map to this.
+ */
+ public Map<String, Object> toConf() {
+ Map<String, Object> ret = new HashMap<>();
+ ret.put("slownessMs", maxSlownessMs);
+ ret.put("count", count);
+ return ret;
+ }
+
+ public SlowExecutorPattern(double maxSlownessMs, int count) {
+ this.count = count;
+ this.maxSlownessMs = maxSlownessMs;
+ }
+
+ public double getExtraSlowness(int index) {
+ return (index >= count) ? 0 : maxSlownessMs;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
index 3d3a271..3af5803 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
@@ -21,7 +21,6 @@ package org.apache.storm.loadgen;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
@@ -32,6 +31,7 @@ import org.apache.commons.cli.ParseException;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.metric.LoggingMetricsConsumer;
+import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
@@ -40,6 +40,8 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,12 +88,28 @@ public class ThroughputVsLatency {
}
public static class SplitSentence extends BaseBasicBolt {
+ private ExecAndProcessLatencyEngine sleep;
+ private int executorIndex;
+
+ public SplitSentence(SlowExecutorPattern slowness) {
+ super();
+ sleep = new ExecAndProcessLatencyEngine(slowness);
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ executorIndex = context.getThisTaskIndex();
+ sleep.prepare();
+ }
+
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
- String sentence = tuple.getString(0);
- for (String word: sentence.split("\\s+")) {
- collector.emit(new Values(word, 1));
- }
+ sleep.simulateProcessAndExecTime(executorIndex, Time.nanoTime(), null , () -> {
+ String sentence = tuple.getString(0);
+ for (String word: sentence.split("\\s+")) {
+ collector.emit(new Values(word, 1));
+ }
+ });
}
@Override
@@ -163,6 +181,13 @@ public class ThroughputVsLatency {
.desc("Number of splitter bolts to use (defaults to " + DEFAULT_NUM_SPLITS + ")")
.build());
options.addOption(Option.builder()
+ .longOpt("splitter-imbalance")
+ .argName("MS(:COUNT)?")
+ .hasArg()
+ .desc("The number of ms that the first COUNT splitters will wait before processing. This creates an imbalance "
+ + "that helps test load aware groupings (defaults to 0:1)")
+ .build());
+ options.addOption(Option.builder()
.longOpt("counters")
.argName("NUM")
.hasArg()
@@ -172,6 +197,7 @@ public class ThroughputVsLatency {
CommandLineParser parser = new DefaultParser();
CommandLine cmd = null;
Exception commandLineException = null;
+ SlowExecutorPattern slowness = null;
double numMins = TEST_EXECUTE_TIME_DEFAULT;
double ratePerSecond = DEFAULT_RATE_PER_SECOND;
String name = DEFAULT_TOPO_NAME;
@@ -198,6 +224,9 @@ public class ThroughputVsLatency {
if (cmd.hasOption("counters")) {
numCounts = Integer.parseInt(cmd.getOptionValue("counters"));
}
+ if (cmd.hasOption("splitter-imbalance")) {
+ slowness = SlowExecutorPattern.fromString(cmd.getOptionValue("splitter-imbalance"));
+ }
} catch (ParseException | NumberFormatException e) {
commandLineException = e;
}
@@ -216,11 +245,12 @@ public class ThroughputVsLatency {
metrics.put("count_parallel", numCounts);
Config conf = new Config();
- LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd, metrics);
+ Map<String, Object> sysConf = Utils.readStormConfig();
+ LoadMetricsServer metricServer = new LoadMetricsServer(sysConf, cmd, metrics);
metricServer.serve();
String url = metricServer.getUrl();
- NimbusClient client = NimbusClient.getConfiguredClient(conf);
+ NimbusClient client = NimbusClient.getConfiguredClient(sysConf);
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
conf.registerMetricsConsumer(HttpForwardingMetricsConsumer.class, url, 1);
Map<String, String> workerMetrics = new HashMap<>();
@@ -238,7 +268,8 @@ public class ThroughputVsLatency {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new FastRandomSentenceSpout((long) ratePerSecond / numSpouts), numSpouts);
- builder.setBolt("split", new SplitSentence(), numSplits).shuffleGrouping("spout");
+ builder.setBolt("split", new SplitSentence(slowness), numSplits)
+ .shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), numCounts).fieldsGrouping("split", new Fields("word"));
int exitStatus = -1;
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
index b171972..4f297ab 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
@@ -228,7 +228,7 @@ public class TopologyLoadConf {
}
/**
- * The first one that is not null
+ * The first one that is not null.
* @param rest all the other somethings
* @param <V> whatever type you want.
* @return the first one that is not null
@@ -266,6 +266,15 @@ public class TopologyLoadConf {
return ret;
}
+ private LoadCompConf overrideCompSlowExec(LoadCompConf comp, Map<String, SlowExecutorPattern> topoSpecific) {
+ LoadCompConf ret = comp;
+ SlowExecutorPattern slp = topoSpecific.get(name + ":" + comp.id);
+ if (slp != null) {
+ ret = ret.overrideSlowExecutorPattern(slp);
+ }
+ return ret;
+ }
+
/**
* Scale all of the components in the topology by a percentage (but keep the throughput the same).
* @param v the amount to scale them by. 1.0 is nothing, 0.5 cuts them in half, 2.0 doubles them.
@@ -299,6 +308,22 @@ public class TopologyLoadConf {
}
/**
+ * Override the SlowExecutorPattern for given components.
+ * @param topoSpecific what we are going to use to override.
+ * @return a copy of this with the needed adjustments made.
+ */
+ public TopologyLoadConf overrideSlowExecs(Map<String, SlowExecutorPattern> topoSpecific) {
+ if (topoSpecific == null || topoSpecific.isEmpty()) {
+ return this;
+ }
+ List<LoadCompConf> modedSpouts = spouts.stream().map((s) -> overrideCompSlowExec(s, topoSpecific))
+ .collect(Collectors.toList());
+ List<LoadCompConf> modedBolts = bolts.stream().map((b) -> overrideCompSlowExec(b, topoSpecific))
+ .collect(Collectors.toList());
+ return new TopologyLoadConf(name, topoConf, modedSpouts, modedBolts, streams);
+ }
+
+ /**
* Create a new version of this topology with identifiable information removed.
* @return the anonymized version of the TopologyLoadConf.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index 0bc56fb..658aadc 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -62,7 +62,6 @@ public class Task {
private TopologyContext systemTopologyContext;
private TopologyContext userTopologyContext;
private WorkerTopologyContext workerTopologyContext;
- private LoadMapping loadMapping;
private Integer taskId;
private String componentId;
private Object taskObject; // Spout/Bolt object
@@ -84,7 +83,6 @@ public class Task {
this.builtInMetrics = BuiltinMetricsUtil.mkData(executor.getType(), this.executorStats);
this.workerTopologyContext = executor.getWorkerTopologyContext();
this.emitSampler = ConfigUtils.mkStatsSampler(topoConf);
- this.loadMapping = workerData.getLoadMapping();
this.systemTopologyContext = mkTopologyContext(workerData.getSystemTopology());
this.userTopologyContext = mkTopologyContext(workerData.getTopology());
this.taskObject = mkTaskObject();
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 378099c..9e7bd0b 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -286,7 +286,7 @@ public class Worker implements Shutdownable, DaemonCommon {
}
public void doRefreshLoad() {
- workerState.refreshLoad();
+ workerState.refreshLoad(executorsAtom.get());
final List<IRunningExecutor> executors = executorsAtom.get();
for (IRunningExecutor executor : executors) {
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index d244491..33ea579 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -28,6 +28,7 @@ import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.cluster.VersionedData;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.executor.IRunningExecutor;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.DebugOptions;
import org.apache.storm.generated.Grouping;
@@ -444,15 +445,20 @@ public class WorkerState {
this.throttleOn.set(backpressure);
}
- public void refreshLoad() {
- Set<Integer> remoteTasks = Sets.difference(new HashSet<Integer>(outboundTasks), new HashSet<>(taskIds));
+ private static double getQueueLoad(DisruptorQueue q) {
+ DisruptorQueue.QueueMetrics qMetrics = q.getMetrics();
+ return ((double) qMetrics.population()) / qMetrics.capacity();
+ }
+
+ public void refreshLoad(List<IRunningExecutor> execs) {
+ Set<Integer> remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(taskIds));
Long now = System.currentTimeMillis();
- Map<Integer, Double> localLoad = shortExecutorReceiveQueueMap.entrySet().stream().collect(Collectors.toMap(
- (Function<Map.Entry<Integer, DisruptorQueue>, Integer>) Map.Entry::getKey,
- (Function<Map.Entry<Integer, DisruptorQueue>, Double>) entry -> {
- DisruptorQueue.QueueMetrics qMetrics = entry.getValue().getMetrics();
- return ( (double) qMetrics.population()) / qMetrics.capacity();
- }));
+ Map<Integer, Double> localLoad = new HashMap<>();
+ for (IRunningExecutor exec: execs) {
+ double receiveLoad = getQueueLoad(exec.getReceiveQueue());
+ double sendLoad = getQueueLoad(exec.getSendQueue());
+ localLoad.put(exec.getExecutorId().get(0).intValue(), Math.max(receiveLoad, sendLoad));
+ }
Map<Integer, Load> remoteLoad = new HashMap<>();
cachedNodeToPortSocket.get().values().stream().forEach(conn -> remoteLoad.putAll(conn.getLoad(remoteTasks)));
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index b787fbb..e55aca0 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -115,7 +115,7 @@ public abstract class Executor implements Callable, EventHandler<Object> {
protected final IReportError reportError;
protected final Random rand;
- protected final DisruptorQueue transferQueue;
+ protected final DisruptorQueue sendQueue;
protected final DisruptorQueue receiveQueue;
protected Map<Integer, Task> idToTask;
protected final Map<String, String> credentials;
@@ -140,8 +140,8 @@ public abstract class Executor implements Callable, EventHandler<Object> {
this.stormActive = workerData.getIsTopologyActive();
this.stormComponentDebug = workerData.getStormComponentToDebug();
- this.transferQueue = mkExecutorBatchQueue(topoConf, executorId);
- this.executorTransfer = new ExecutorTransfer(workerData, transferQueue, topoConf);
+ this.sendQueue = mkExecutorBatchQueue(topoConf, executorId);
+ this.executorTransfer = new ExecutorTransfer(workerData, sendQueue, topoConf);
this.suicideFn = workerData.getSuicideCallback();
try {
@@ -253,7 +253,7 @@ public abstract class Executor implements Callable, EventHandler<Object> {
setupTicks(StatsUtil.SPOUT.equals(type));
LOG.info("Finished loading executor " + componentId + ":" + executorId);
- return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
+ return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask, receiveQueue, sendQueue);
}
public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
@@ -562,7 +562,7 @@ public abstract class Executor implements Callable, EventHandler<Object> {
}
public DisruptorQueue getTransferWorkerQueue() {
- return transferQueue;
+ return sendQueue;
}
public IStormClusterState getStormClusterState() {
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 7ea48b0..c7691e4 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -31,6 +31,7 @@ import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.DisruptorQueue;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,11 +45,16 @@ public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
private final Executor executor;
private final List<Utils.SmartThread> threads;
private final Map<Integer, Task> taskDatas;
+ private final DisruptorQueue receiveQueue;
+ private final DisruptorQueue sendQueue;
- public ExecutorShutdown(Executor executor, List<Utils.SmartThread> threads, Map<Integer, Task> taskDatas) {
+ public ExecutorShutdown(Executor executor, List<Utils.SmartThread> threads, Map<Integer, Task> taskDatas,
+ DisruptorQueue receiveQueue, DisruptorQueue sendQueue) {
this.executor = executor;
this.threads = threads;
this.taskDatas = taskDatas;
+ this.receiveQueue = receiveQueue;
+ this.sendQueue = sendQueue;
}
@Override
@@ -80,6 +86,16 @@ public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
}
@Override
+ public DisruptorQueue getReceiveQueue() {
+ return receiveQueue;
+ }
+
+ @Override
+ public DisruptorQueue getSendQueue() {
+ return sendQueue;
+ }
+
+ @Override
public void shutdown() {
try {
LOG.info("Shutting down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
index e7a4117..4b7f483 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
@@ -22,6 +22,7 @@ import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.grouping.LoadMapping;
import java.util.List;
+import org.apache.storm.utils.DisruptorQueue;
public interface IRunningExecutor {
@@ -30,4 +31,6 @@ public interface IRunningExecutor {
void credentialsChanged(Credentials credentials);
void loadChanged(LoadMapping loadMapping);
boolean getBackPressureFlag();
+ DisruptorQueue getReceiveQueue();
+ DisruptorQueue getSendQueue();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 5d9edf1..4e46dc5 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -70,7 +70,7 @@ public class BoltExecutor extends Executor {
((ICredentialsListener) boltObject).setCredentials(credentials);
}
if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
- Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue,
+ Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", sendQueue, "receive", receiveQueue,
"transfer", workerData.getTransferQueue());
BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
@@ -78,7 +78,7 @@ public class BoltExecutor extends Executor {
BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, topoConf, userContext);
BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), topoConf, userContext);
} else {
- Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+ Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", sendQueue, "receive", receiveQueue);
BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index 6e85f34..c465338 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -118,7 +118,7 @@ public class SpoutExecutor extends Executor {
this.outputCollectors.add(outputCollector);
taskData.getBuiltInMetrics().registerAll(topoConf, taskData.getUserContext());
- Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+ Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", sendQueue, "receive", receiveQueue);
BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, taskData.getUserContext());
if (spoutObject instanceof ICredentialsListener) {
@@ -152,7 +152,7 @@ public class SpoutExecutor extends Executor {
spout.activate();
}
}
- if (!transferQueue.isFull() && !throttleOn && !reachedMaxSpoutPending) {
+ if (!sendQueue.isFull() && !throttleOn && !reachedMaxSpoutPending) {
for (ISpout spout : spouts) {
spout.nextTuple();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 0c0560a..f5b63ec 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -15,61 +15,61 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.grouping;
+import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.WorkerTopologyContext;
public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
- private static final int CAPACITY = 1000;
+ static final int CAPACITY = 1000;
+ private static final int MAX_WEIGHT = 100;
+ private static class IndexAndWeights {
+ final int index;
+ int weight;
+
+ IndexAndWeights(int index) {
+ this.index = index;
+ weight = MAX_WEIGHT;
+ }
+ }
+ private final Map<Integer, IndexAndWeights> orig = new HashMap<>();
private Random random;
- private List<Integer>[] rets;
- private int[] targets;
- private int[] loads;
- private int[] unassigned;
- private int[] choices;
- private int[] prepareChoices;
+ @VisibleForTesting
+ List<Integer>[] rets;
+ @VisibleForTesting
+ volatile int[] choices;
+ private volatile int[] prepareChoices;
private AtomicInteger current;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
random = new Random();
- rets = (List<Integer>[])new List<?>[targetTasks.size()];
- targets = new int[targetTasks.size()];
- for (int i = 0; i < targets.length; i++) {
- rets[i] = Arrays.asList(targetTasks.get(i));
- targets[i] = targetTasks.get(i);
+ rets = (List<Integer>[]) new List<?>[targetTasks.size()];
+ int i = 0;
+ for (int target : targetTasks) {
+ rets[i] = Arrays.asList(target);
+ orig.put(target, new IndexAndWeights(i));
+ i++;
}
// can't leave choices to be empty, so initiate it similar as ShuffleGrouping
choices = new int[CAPACITY];
- for (int i = 0 ; i < CAPACITY ; i++) {
- choices[i] = i % rets.length;
- }
-
- shuffleArray(choices);
- current = new AtomicInteger(-1);
-
+ current = new AtomicInteger(0);
// allocate another array to be switched
prepareChoices = new int[CAPACITY];
-
- // allocating only once
- loads = new int[targets.length];
- unassigned = new int[targets.length];
+ updateRing(null);
}
@Override
@@ -93,48 +93,44 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
updateRing(loadMapping);
}
- private void updateRing(LoadMapping load) {
- int localTotal = 0;
- for (int i = 0 ; i < targets.length; i++) {
- int val = (int)(101 - (load.get(targets[i]) * 100));
- loads[i] = val;
- localTotal += val;
+ private synchronized void updateRing(LoadMapping load) {
+ //We will adjust weights based off of the minimum load
+ double min = load == null ? 0 : orig.keySet().stream().mapToDouble((key) -> load.get(key)).min().getAsDouble();
+ for (Map.Entry<Integer, IndexAndWeights> target: orig.entrySet()) {
+ IndexAndWeights val = target.getValue();
+ double l = load == null ? 0.0 : load.get(target.getKey());
+ if (l <= min + (0.05)) {
+ //We assume that within 5% of the minimum congestion is still fine.
+ //Not congested we grow (but slowly)
+ val.weight = Math.min(MAX_WEIGHT, val.weight + 1);
+ } else {
+ //Congested we contract much more quickly
+ val.weight = Math.max(0, val.weight - 10);
+ }
}
+ //Now we need to build the array
+ long weightSum = orig.values().stream().mapToLong((w) -> w.weight).sum();
+ //Now we can calculate a percentage
int currentIdx = 0;
- int unassignedIdx = 0;
- for (int i = 0 ; i < loads.length ; i++) {
- if (currentIdx == CAPACITY) {
- break;
- }
-
- int loadForTask = loads[i];
- int amount = Math.round(loadForTask * 1.0f * CAPACITY / localTotal);
- // assign at least one for task
- if (amount == 0) {
- unassigned[unassignedIdx++] = i;
- }
- for (int j = 0; j < amount; j++) {
- if (currentIdx == CAPACITY) {
- break;
+ if (weightSum > 0) {
+ for (IndexAndWeights indexAndWeights : orig.values()) {
+ int count = (int) ((indexAndWeights.weight / (double) weightSum) * CAPACITY);
+ for (int i = 0; i < count && currentIdx < CAPACITY; i++) {
+ prepareChoices[currentIdx] = indexAndWeights.index;
+ currentIdx++;
}
-
- prepareChoices[currentIdx++] = i;
}
- }
- if (currentIdx < CAPACITY) {
- // if there're some rooms, give unassigned tasks a chance to be included
- // this should be really small amount, so just add them sequentially
- if (unassignedIdx > 0) {
- for (int i = currentIdx ; i < CAPACITY ; i++) {
- prepareChoices[i] = unassigned[(i - currentIdx) % unassignedIdx];
- }
- } else {
- // just pick random
- for (int i = currentIdx ; i < CAPACITY ; i++) {
- prepareChoices[i] = random.nextInt(loads.length);
- }
+ //in case we didn't fill in enough
+ for (; currentIdx < CAPACITY; currentIdx++) {
+ prepareChoices[currentIdx] = prepareChoices[random.nextInt(currentIdx)];
+ }
+ } else {
+ //This really should be impossible, because we go off of the min load, and inc anything within 5% of it.
+ // But just to be sure it is never an issue, especially with float rounding etc.
+ for (;currentIdx < CAPACITY; currentIdx++) {
+ prepareChoices[currentIdx] = currentIdx % rets.length;
}
}
@@ -160,5 +156,4 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
arr[i] = arr[j];
arr[j] = tmp;
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 53ac404..a5f9304 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -19,12 +19,12 @@ package org.apache.storm.grouping;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.storm.StormTimer;
+import java.util.Arrays;
import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.NullStruct;
import org.apache.storm.task.WorkerTopologyContext;
-import org.apache.storm.utils.Utils;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
@@ -55,6 +55,64 @@ public class LoadAwareShuffleGroupingTest {
private static final Logger LOG = LoggerFactory.getLogger(LoadAwareShuffleGroupingTest.class);
@Test
+ public void testUnevenLoadOverTime() throws Exception {
+ LoadAwareShuffleGrouping grouping = new LoadAwareShuffleGrouping();
+ WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+ grouping.prepare(context, new GlobalStreamId("a", "default"), Arrays.asList(1, 2));
+ double expectedOneWeight = 100.0;
+ double expectedTwoWeight = 100.0;
+
+ Map<Integer, Double> localLoad = new HashMap<>();
+ localLoad.put(1, 1.0);
+ localLoad.put(2, 0.0);
+ LoadMapping lm = new LoadMapping();
+ lm.setLocal(localLoad);
+ //First verify that if something has a high load it's distribution will drop over time
+ for (int i = 9; i >= 0; i--) {
+ grouping.refreshLoad(lm);
+ expectedOneWeight -= 10.0;
+ Map<Integer, Double> countByType = count(grouping.choices, grouping.rets);
+ LOG.info("contByType = {}", countByType);
+ double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight);
+ double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight);
+ assertEquals("i = " + i,
+ expectedOnePercentage, countByType.getOrDefault(1, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+ 0.01);
+ assertEquals("i = " + i,
+ expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+ 0.01);
+ }
+
+ //Now verify that when it is switched we can recover
+ localLoad.put(1, 0.0);
+ localLoad.put(2, 1.0);
+ lm.setLocal(localLoad);
+
+ while (expectedOneWeight < 100.0) {
+ grouping.refreshLoad(lm);
+ expectedOneWeight += 1.0;
+ expectedTwoWeight = Math.max(0.0, expectedTwoWeight - 10.0);
+ Map<Integer, Double> countByType = count(grouping.choices, grouping.rets);
+ LOG.info("contByType = {}", countByType);
+ double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight);
+ double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight);
+ assertEquals(expectedOnePercentage, countByType.getOrDefault(1, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+ 0.01);
+ assertEquals(expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+ 0.01);
+ }
+ }
+
+ private Map<Integer,Double> count(int[] choices, List<Integer>[] rets) {
+ Map<Integer, Double> ret = new HashMap<>();
+ for (int i : choices) {
+ int task = rets[i].get(0);
+ ret.put(task, ret.getOrDefault(task, 0.0) + 1);
+ }
+ return ret;
+ }
+
+ @Test
public void testLoadAwareShuffleGroupingWithEvenLoad() {
// just pick arbitrary number
final int numTasks = 7;
@@ -161,36 +219,6 @@ public class LoadAwareShuffleGroupingTest {
}
@Test
- public void testLoadAwareShuffleGroupingWithUnevenLoad() {
- // just pick arbitrary number
- final int numTasks = 7;
- final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
-
- // Define our taskIds and loads
- final List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
- final LoadMapping loadMapping = buildLocalTasksUnevenLoadMapping(availableTaskIds);
-
- runDistributionVerificationTestWithUnevenLoad(numTasks, grouper, availableTaskIds,
- loadMapping);
- }
-
- @Test
- public void testLoadAwareShuffleGroupingWithRandomTasksAndRandomLoad() {
- for (int trial = 0 ; trial < 200 ; trial++) {
- // just pick arbitrary number in 5 ~ 100
- final int numTasks = new Random().nextInt(96) + 5;
- final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
-
- // Define our taskIds and loads
- final List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
- final LoadMapping loadMapping = buildLocalTasksRandomLoadMapping(availableTaskIds);
-
- runDistributionVerificationTestWithUnevenLoad(numTasks, grouper, availableTaskIds,
- loadMapping);
- }
- }
-
- @Test
public void testShuffleLoadEven() {
// port test-shuffle-load-even
LoadAwareCustomStreamGrouping shuffler = GrouperFactory
@@ -228,46 +256,6 @@ public class LoadAwareShuffleGroupingTest {
assertTrue(load2 <= maxPrCount);
}
- @Test
- public void testShuffleLoadUneven() {
- // port test-shuffle-load-uneven
- LoadAwareCustomStreamGrouping shuffler = GrouperFactory
- .mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
- Lists.newArrayList(1, 2), Collections.emptyMap());
- int numMessages = 100000;
- int min1PrCount = (int) (numMessages * (0.33 - ACCEPTABLE_MARGIN));
- int max1PrCount = (int) (numMessages * (0.33 + ACCEPTABLE_MARGIN));
- int min2PrCount = (int) (numMessages * (0.66 - ACCEPTABLE_MARGIN));
- int max2PrCount = (int) (numMessages * (0.66 + ACCEPTABLE_MARGIN));
- LoadMapping load = new LoadMapping();
- Map<Integer, Double> loadInfoMap = new HashMap<>();
- loadInfoMap.put(1, 0.5);
- loadInfoMap.put(2, 0.0);
- load.setLocal(loadInfoMap);
-
- // force triggers building ring
- shuffler.refreshLoad(load);
-
- List<Object> data = Lists.newArrayList(1, 2);
- int[] frequencies = new int[3]; // task id starts from 1
- for (int i = 0 ; i < numMessages ; i++) {
- List<Integer> tasks = shuffler.chooseTasks(1, data);
- for (int task : tasks) {
- frequencies[task]++;
- }
- }
-
- int load1 = frequencies[1];
- int load2 = frequencies[2];
-
- LOG.info("Frequency info: load1 = {}, load2 = {}", load1, load2);
-
- assertTrue(load1 >= min1PrCount);
- assertTrue(load1 <= max1PrCount);
- assertTrue(load2 >= min2PrCount);
- assertTrue(load2 <= max2PrCount);
- }
-
@Ignore
@Test
public void testBenchmarkLoadAwareShuffleGroupingEvenLoad() {
@@ -374,37 +362,6 @@ public class LoadAwareShuffleGroupingTest {
return taskCounts;
}
- private void runDistributionVerificationTestWithUnevenLoad(int numTasks,
- LoadAwareShuffleGrouping grouper, List<Integer> availableTaskIds,
- LoadMapping loadMapping) {
- int[] loads = new int[numTasks];
- int localTotal = 0;
- List<Double> loadRate = new ArrayList<>();
- for (int i = 0; i < numTasks; i++) {
- int val = (int)(101 - (loadMapping.get(i) * 100));
- loads[i] = val;
- localTotal += val;
- }
-
- for (int i = 0; i < numTasks; i++) {
- loadRate.add(loads[i] * 1.0 / localTotal);
- }
-
- WorkerTopologyContext context = mock(WorkerTopologyContext.class);
- grouper.prepare(context, null, availableTaskIds);
-
- // Keep track of how many times we see each taskId
- int totalEmits = 5000 * numTasks;
- int[] taskCounts = runChooseTasksWithVerification(grouper, totalEmits, numTasks, loadMapping);
-
- int delta = (int) (totalEmits * ACCEPTABLE_MARGIN);
- for (int i = 0; i < numTasks; i++) {
- int expected = (int) (totalEmits * loadRate.get(i));
- assertTrue("Distribution should respect the task load with small delta",
- taskCounts[i] >= expected - delta && taskCounts[i] <= expected + delta);
- }
- }
-
private void runSimpleBenchmark(LoadAwareCustomStreamGrouping grouper,
List<Integer> availableTaskIds, LoadMapping loadMapping) {
// Task Id not used, so just pick a static value
[2/2] storm git commit: Merge branch 'STORM-2733' of
https://github.com/revans2/incubator-storm into STORM-2733
Posted by bo...@apache.org.
Merge branch 'STORM-2733' of https://github.com/revans2/incubator-storm into STORM-2733
STORM-2733: Better load aware shuffle implementation
This closes #2321
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/124acb92
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/124acb92
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/124acb92
Branch: refs/heads/master
Commit: 124acb92dff04a57b530ab4d95a698abc8ff46d9
Parents: 7901051 b9f1d7e
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Thu Sep 14 10:15:16 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Thu Sep 14 10:15:16 2017 -0500
----------------------------------------------------------------------
examples/storm-loadgen/README.md | 5 +
examples/storm-loadgen/pom.xml | 2 +-
.../loadgen/ExecAndProcessLatencyEngine.java | 119 ++++++++++++++
.../java/org/apache/storm/loadgen/GenLoad.java | 25 +++
.../java/org/apache/storm/loadgen/LoadBolt.java | 63 +------
.../org/apache/storm/loadgen/LoadCompConf.java | 43 ++++-
.../apache/storm/loadgen/LoadMetricsServer.java | 4 +-
.../apache/storm/loadgen/NormalDistStats.java | 2 +-
.../org/apache/storm/loadgen/OutputStream.java | 2 +-
.../storm/loadgen/SlowExecutorPattern.java | 83 ++++++++++
.../storm/loadgen/ThroughputVsLatency.java | 47 +++++-
.../apache/storm/loadgen/TopologyLoadConf.java | 27 ++-
.../src/jvm/org/apache/storm/daemon/Task.java | 2 -
.../org/apache/storm/daemon/worker/Worker.java | 2 +-
.../apache/storm/daemon/worker/WorkerState.java | 22 ++-
.../jvm/org/apache/storm/executor/Executor.java | 10 +-
.../apache/storm/executor/ExecutorShutdown.java | 18 +-
.../apache/storm/executor/IRunningExecutor.java | 3 +
.../storm/executor/bolt/BoltExecutor.java | 4 +-
.../storm/executor/spout/SpoutExecutor.java | 4 +-
.../grouping/LoadAwareShuffleGrouping.java | 129 +++++++--------
.../grouping/LoadAwareShuffleGroupingTest.java | 163 +++++++------------
22 files changed, 512 insertions(+), 267 deletions(-)
----------------------------------------------------------------------