You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/14 15:32:15 UTC

[1/2] storm git commit: STORM-2733: Better load aware shuffle implementation

Repository: storm
Updated Branches:
  refs/heads/master 79010512d -> 124acb92d


STORM-2733: Better load aware shuffle implementation


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9f1d7ef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9f1d7ef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9f1d7ef

Branch: refs/heads/master
Commit: b9f1d7efd96cf2fb1393f7a2e4277e3fb68b21bc
Parents: 6d23b8b
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Aug 24 11:13:08 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 11 15:38:03 2017 -0500

----------------------------------------------------------------------
 examples/storm-loadgen/README.md                |   5 +
 examples/storm-loadgen/pom.xml                  |   2 +-
 .../loadgen/ExecAndProcessLatencyEngine.java    | 119 ++++++++++++++
 .../java/org/apache/storm/loadgen/GenLoad.java  |  25 +++
 .../java/org/apache/storm/loadgen/LoadBolt.java |  63 +------
 .../org/apache/storm/loadgen/LoadCompConf.java  |  43 ++++-
 .../apache/storm/loadgen/LoadMetricsServer.java |   4 +-
 .../apache/storm/loadgen/NormalDistStats.java   |   2 +-
 .../org/apache/storm/loadgen/OutputStream.java  |   2 +-
 .../storm/loadgen/SlowExecutorPattern.java      |  83 ++++++++++
 .../storm/loadgen/ThroughputVsLatency.java      |  47 +++++-
 .../apache/storm/loadgen/TopologyLoadConf.java  |  27 ++-
 .../src/jvm/org/apache/storm/daemon/Task.java   |   2 -
 .../org/apache/storm/daemon/worker/Worker.java  |   2 +-
 .../apache/storm/daemon/worker/WorkerState.java |  22 ++-
 .../jvm/org/apache/storm/executor/Executor.java |  10 +-
 .../apache/storm/executor/ExecutorShutdown.java |  18 +-
 .../apache/storm/executor/IRunningExecutor.java |   3 +
 .../storm/executor/bolt/BoltExecutor.java       |   4 +-
 .../storm/executor/spout/SpoutExecutor.java     |   4 +-
 .../grouping/LoadAwareShuffleGrouping.java      | 129 +++++++--------
 .../grouping/LoadAwareShuffleGroupingTest.java  | 163 +++++++------------
 22 files changed, 512 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/README.md
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/README.md b/examples/storm-loadgen/README.md
index 5a6a030..88ad5f9 100644
--- a/examples/storm-loadgen/README.md
+++ b/examples/storm-loadgen/README.md
@@ -47,6 +47,7 @@ storm jar storm-loadgen.jar org.apache.storm.loadgen.GenLoad [options] [capture_
 | -t,--test-time &lt;MINS> | How long to run the tests for in mins (defaults to 5) |
 | --throughput &lt;MULTIPLIER(:TOPO:COMP)?> | How much to scale the topology up or down in throughput. If a topology + component is supplied only that component will be scaled. If topo or component is blank or a `'*'` all topologies or components matched will be scaled. Only 1 scaling rule, the most specific, will be applied to a component. Providing a topology name is considered more specific than not providing one.(defaults to 1.0 no scaling)|
 | -w,--report-window &lt;INTERVAL_SECS> | How long of a rolling window should be in each report.  Will be rounded up to the next report interval boundary. default 30|
+| --imbalance &lt;MS(:COUNT)?:TOPO:COMP> | The number of ms that the first COUNT of TOPO:COMP will wait before processing.  This creates an imbalance that helps test load aware groupings. By default there is no imbalance unless specificed by the captrue file. |
 
 ## ThroughputVsLatency
 A word count topology with metrics reporting like the `GenLoad` command.
@@ -68,6 +69,7 @@ storm jar storm-loadgen.jar org.apache.storm.loadgen.ThroughputVsLatency [option
 | --spouts &lt;NUM>| Number of spouts to use (defaults to 1) |
 | -t,--test-time &lt;MINS>| How long to run the tests for in mins (defaults to 5) |
 | -w,--report-window &lt;INTERVAL_SECS>| How long of a rolling window should be in each report.  Will be rounded up to the next report interval boundary.|
+| --splitter-imbalance &lt;MS(:COUNT)?> | The number of ms that the first COUNT splitters will wait before processing.  This creates an imbalance that helps test load aware groupings (defaults to 0:1)|
 
 # Reporters
 Reporters provide a way to store various statistics about a running topology. There are currently a few supported reporters
@@ -140,6 +142,7 @@ There are a lot of different metrics supported
 |throughput_adjust| The adjustment to the throughput in `GenLoad`. | GenLoad
 |topo_throughput| A list of topology/component specific adjustment rules to the throughput in `GenLoad`. | GenLoad
 |local\_or\_shuffle| true if shuffles were replaced with local or shuffle in GenLoad. | GenLoad
+|slow\_execs| A list of topology/component specific adjustment rules to the slowExecutorPattern in `GenLoad`. | GenLoad
 
 There are also some generic rules that you can use for some metrics.  Any metric that starts with `"conf:"` will be the config for that.  It does not include config overrides from the `GenLoad` file.
 
@@ -170,6 +173,8 @@ Spouts and bolts have the same format.
 | streams | The streams that are produced by this bolt or spout |
 | cpuLoad | The number of cores this component needs for resource aware scheduling |
 | memoryLoad | The amount of memory in MB that this component needs for resource aware scheduling |
+| slowExecutorPattern.slownessMs | an Optional number of ms to slow down the exec + process latency for some of this component (defaults to 0) |
+| slowExecutorPattern.count | the number of components to slow down (defaults to 1) | 
 
 ### Output Streams
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/pom.xml b/examples/storm-loadgen/pom.xml
index aa43638..e79e14c 100644
--- a/examples/storm-loadgen/pom.xml
+++ b/examples/storm-loadgen/pom.xml
@@ -114,7 +114,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
         <artifactId>maven-checkstyle-plugin</artifactId>
         <!--Note - the version would be inherited-->
         <configuration>
-          <maxAllowedViolations>3</maxAllowedViolations>
+          <maxAllowedViolations>0</maxAllowedViolations>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ExecAndProcessLatencyEngine.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ExecAndProcessLatencyEngine.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ExecAndProcessLatencyEngine.java
new file mode 100644
index 0000000..2f5257d
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ExecAndProcessLatencyEngine.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.Serializable;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * A more accurate sleep implementation.
+ */
+public class ExecAndProcessLatencyEngine implements Serializable {
+    private static final long NANO_IN_MS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
+    private final SlowExecutorPattern skewedPattern;
+
+    public static long toNano(double ms) {
+        return (long)(ms * NANO_IN_MS);
+    }
+
+    private final AtomicLong parkOffset = new AtomicLong(0);
+    private Random rand;
+    private ScheduledExecutorService timer;
+
+    public ExecAndProcessLatencyEngine() {
+        this(null);
+    }
+
+    public ExecAndProcessLatencyEngine(SlowExecutorPattern skewedPattern) {
+        this.skewedPattern = skewedPattern;
+    }
+
+    public void prepare() {
+        this.rand = ThreadLocalRandom.current();
+        this.timer = Executors.newSingleThreadScheduledExecutor();
+    }
+
+    /**
+     * Sleep for a set number of nano seconds.
+     * @param start the start time of the sleep
+     * @param sleepAmount how many nano seconds after start when we should stop.
+     */
+    public void sleepNano(long start, long sleepAmount) {
+        long endTime = start + sleepAmount;
+        // A small control algorithm to adjust the amount of time that we sleep to make it more accurate
+        long newEnd = endTime - parkOffset.get();
+        long diff = newEnd - start;
+        //There are some different levels of accuracy here, and we want to deal with all of them
+        if (diff <= 1_000) {
+            //We are done, nothing that short is going to work here
+        } else if (diff < NANO_IN_MS) {
+            //Busy wait...
+            long sum = 0;
+            while (System.nanoTime() < newEnd) {
+                for (long i = 0; i < 1_000_000; i++) {
+                    sum += i;
+                }
+            }
+        } else {
+            //More accurate that thread.sleep, but still not great
+            LockSupport.parkNanos(newEnd - System.nanoTime());
+        }
+        parkOffset.addAndGet((System.nanoTime() - endTime) / 2);
+    }
+
+    public void sleepNano(long nano) {
+        sleepNano(System.nanoTime(), nano);
+    }
+
+    public void sleepUntilNano(long endTime) {
+        long start = System.nanoTime();
+        sleepNano(start, endTime - start);
+    }
+
+    /**
+     * Simulate both process and exec times.
+     * @param executorIndex the index of this executor.  It is used to skew the latencies.
+     * @param startTimeNs when the executor started in nano-seconds.
+     * @param in the metrics for the input stream (or null if you don't want to use them).
+     * @param r what to run when the process latency is up.  Note that this may run on a separate thread after this method call has
+     *     completed.
+     */
+    public void simulateProcessAndExecTime(int executorIndex, long startTimeNs, InputStream in, Runnable r) {
+        long extraTimeNs = skewedPattern == null ? 0 : toNano(skewedPattern.getExtraSlowness(executorIndex));
+        long endExecNs = startTimeNs + extraTimeNs + (in == null ? 0 : ExecAndProcessLatencyEngine.toNano(in.execTime.nextRandom(rand)));
+        long endProcNs = startTimeNs + extraTimeNs + (in == null ? 0 : ExecAndProcessLatencyEngine.toNano(in.processTime.nextRandom(rand)));
+
+        if ((endProcNs - 1_000_000) < endExecNs) {
+            sleepUntilNano(endProcNs);
+            r.run();
+        } else {
+            timer.schedule(() -> {
+                r.run();
+            }, Math.max(0, endProcNs - System.nanoTime()), TimeUnit.NANOSECONDS);
+        }
+
+        sleepUntilNano(endExecNs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
index f535b32..141f11a 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/GenLoad.java
@@ -100,6 +100,13 @@ public class GenLoad {
             .desc("replace shuffle grouping with local or shuffle grouping")
             .build());
         options.addOption(Option.builder()
+            .longOpt("imbalance")
+            .argName("MS(:COUNT)?:TOPO:COMP")
+            .hasArg()
+            .desc("The number of ms that the first COUNT of TOPO:COMP will wait before processing.  This creates an imbalance "
+                + "that helps test load aware groupings. By default there is no imbalance.  If no count is given it defaults to 1")
+            .build());
+        options.addOption(Option.builder()
             .longOpt("debug")
             .desc("Print debug information about the adjusted topology before submitting it.")
             .build());
@@ -112,6 +119,7 @@ public class GenLoad {
         Map<String, Double> topoSpecificParallel = new HashMap<>();
         double globalThroughput = 1.0;
         Map<String, Double> topoSpecificThroughput = new HashMap<>();
+        Map<String, SlowExecutorPattern> topoSpecificImbalance = new HashMap<>();
         try {
             cmd = parser.parse(options, args);
             if (cmd.hasOption("t")) {
@@ -161,6 +169,20 @@ public class GenLoad {
                     }
                 }
             }
+            if (cmd.hasOption("imbalance")) {
+                for (String stringImbalance : cmd.getOptionValues("imbalance")) {
+                    //We require there to be both a topology and a component in this case, so parse it out as such.
+                    String [] parts = stringImbalance.split(":");
+                    if (parts.length < 3 || parts.length > 4) {
+                        throw new ParseException(stringImbalance + " does not appear to match the expected pattern");
+                    } else if (parts.length == 3) {
+                        topoSpecificImbalance.put(parts[1] + ":" + parts[2], SlowExecutorPattern.fromString(parts[0]));
+                    } else { //== 4
+                        topoSpecificImbalance.put(parts[2] + ":" + parts[3],
+                            SlowExecutorPattern.fromString(parts[0] + ":" + parts[1]));
+                    }
+                }
+            }
         } catch (ParseException | NumberFormatException e) {
             commandLineException = e;
         }
@@ -179,6 +201,8 @@ public class GenLoad {
             .collect(Collectors.toList()));
         metrics.put("topo_throuhgput", topoSpecificThroughput.entrySet().stream().map((entry) -> entry.getValue() + ":" + entry.getKey())
             .collect(Collectors.toList()));
+        metrics.put("slow_execs", topoSpecificImbalance.entrySet().stream().map((entry) -> entry.getValue() + ":" + entry.getKey())
+            .collect(Collectors.toList()));
 
         Config conf = new Config();
         LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd, metrics);
@@ -193,6 +217,7 @@ public class GenLoad {
                     TopologyLoadConf tlc = readTopology(topoFile);
                     tlc = tlc.scaleParallel(globalParallel, topoSpecificParallel);
                     tlc = tlc.scaleThroughput(globalThroughput, topoSpecificThroughput);
+                    tlc = tlc.overrideSlowExecs(topoSpecificImbalance);
                     if (cmd.hasOption("local-or-shuffle")) {
                         tlc = tlc.replaceShuffleWithLocalOrShuffle();
                     }

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
index e02e8f8..28b611f 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
@@ -28,8 +28,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.LockSupport;
 import java.util.stream.Collectors;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.OutputCollector;
@@ -47,20 +45,16 @@ import org.slf4j.LoggerFactory;
  */
 public class LoadBolt extends BaseRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(LoadBolt.class);
-    private static final long NANO_IN_MS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
     private final List<OutputStream> outputStreamStats;
     private List<OutputStreamEngine> outputStreams;
     private final Map<GlobalStreamId, InputStream> inputStreams = new HashMap<>();
     private OutputCollector collector;
-    private Random rand;
-    private ScheduledExecutorService timer;
-
-    private static long toNano(double ms) {
-        return (long)(ms * NANO_IN_MS);
-    }
+    private final ExecAndProcessLatencyEngine sleep;
+    private int executorIndex;
 
     public LoadBolt(LoadCompConf conf) {
         this.outputStreamStats = Collections.unmodifiableList(new ArrayList<>(conf.streams));
+        sleep = new ExecAndProcessLatencyEngine(conf.slp);
     }
 
     public void add(InputStream inputStream) {
@@ -73,33 +67,8 @@ public class LoadBolt extends BaseRichBolt {
         outputStreams = Collections.unmodifiableList(outputStreamStats.stream()
             .map((ss) -> new OutputStreamEngine(ss)).collect(Collectors.toList()));
         this.collector = collector;
-        this.rand = ThreadLocalRandom.current();
-        this.timer = Executors.newSingleThreadScheduledExecutor();
-    }
-
-    private final AtomicLong parkOffset = new AtomicLong(0);
-
-    private void mySleep(long endTime) {
-        //There are some different levels of accuracy here, and we want to deal with all of them
-        long start = System.nanoTime();
-        long newEnd = endTime - parkOffset.get();
-        long diff = newEnd - start;
-        if (diff <= 1_000) {
-            //We are done, nothing that short is going to work here
-        } else if (diff < NANO_IN_MS) {
-            //Busy wait...
-            long sum = 0;
-            while (System.nanoTime() < newEnd) {
-                for (long i = 0; i < 1_000_000; i++) {
-                    sum += i;
-                }
-            }
-        } else {
-            //More accurate that thread.sleep, but still not great
-            LockSupport.parkNanos(newEnd - System.nanoTime() - parkOffset.get());
-            // A small control algorithm to adjust the amount of time that we sleep to make it more accurate
-        }
-        parkOffset.addAndGet((System.nanoTime() - endTime) / 2);
+        executorIndex = context.getThisTaskIndex();
+        sleep.prepare();
     }
 
     private void emitTuples(Tuple input) {
@@ -114,27 +83,11 @@ public class LoadBolt extends BaseRichBolt {
     @Override
     public void execute(final Tuple input) {
         long startTimeNs = System.nanoTime();
-        InputStream in = inputStreams.get(input.getSourceGlobalStreamId());
-        if (in == null) {
+        InputStream in = inputStreams.get(input.getSourceGlobalStreamid());
+        sleep.simulateProcessAndExecTime(executorIndex, startTimeNs, in, () -> {
             emitTuples(input);
             collector.ack(input);
-        } else {
-            long endExecNs = startTimeNs + toNano(in.execTime.nextRandom(rand));
-            long endProcNs = startTimeNs + toNano(in.processTime.nextRandom(rand));
-
-            if ((endProcNs - 1_000_000) < endExecNs) {
-                mySleep(endProcNs);
-                emitTuples(input);
-                collector.ack(input);
-            } else {
-                timer.schedule(() -> {
-                    emitTuples(input);
-                    collector.ack(input);
-                }, Math.max(0, endProcNs - System.nanoTime()), TimeUnit.NANOSECONDS);
-            }
-
-            mySleep(endExecNs);
-        }
+        });
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
index 6548cc8..80f4faf 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java
@@ -35,6 +35,7 @@ public class LoadCompConf {
     public final List<OutputStream> streams;
     public final double cpuLoad;
     public final double memoryLoad;
+    public final SlowExecutorPattern slp;
 
     /**
      * Parse the LoadCompConf from a config Map.
@@ -54,7 +55,11 @@ public class LoadCompConf {
         double memoryMb = ObjectReader.getDouble(conf.get("memoryLoad"), 0.0);
         double cpuPercent = ObjectReader.getDouble(conf.get("cpuLoad"), 0.0);
 
-        return new LoadCompConf(id, parallelism, streams, memoryMb, cpuPercent);
+        SlowExecutorPattern slp = null;
+        if (conf.containsKey("slowExecutorPattern")) {
+            slp = SlowExecutorPattern.fromConf((Map<String, Object>) conf.get("slowExecutorPattern"));
+        }
+        return new LoadCompConf(id, parallelism, streams, memoryMb, cpuPercent, slp);
     }
 
     /**
@@ -79,6 +84,9 @@ public class LoadCompConf {
             }
             ret.put("streams", streamData);
         }
+        if (slp != null) {
+            ret.put("slowExecutorPattern", slp.toConf());
+        }
         return ret;
     }
 
@@ -95,7 +103,7 @@ public class LoadCompConf {
                 .map((orig) -> orig.remap(id, remappedStreams))
                 .collect(Collectors.toList());
 
-        return new LoadCompConf(remappedId, parallelism, remappedOutStreams, cpuLoad, memoryLoad);
+        return new LoadCompConf(remappedId, parallelism, remappedOutStreams, cpuLoad, memoryLoad, slp);
     }
 
     /**
@@ -116,18 +124,31 @@ public class LoadCompConf {
     public LoadCompConf setParallel(int newParallelism) {
         //We need to adjust the throughput accordingly (so that it stays the same in aggregate)
         double throughputAdjustment = ((double)parallelism) / newParallelism;
-        return new LoadCompConf(id, newParallelism, streams, cpuLoad, memoryLoad).scaleThroughput(throughputAdjustment);
+        return new LoadCompConf(id, newParallelism, streams, cpuLoad, memoryLoad, slp).scaleThroughput(throughputAdjustment);
     }
 
     /**
      * Scale the throughput of this component.
      * @param v 1.0 is unchanged 0.5 will cut the throughput in half.
-     * @return a copu of this with the adjustments made.
+     * @return a copy of this with the adjustments made.
      */
     public LoadCompConf scaleThroughput(double v) {
         if (streams != null) {
             List<OutputStream> newStreams = streams.stream().map((s) -> s.scaleThroughput(v)).collect(Collectors.toList());
-            return new LoadCompConf(id, parallelism, newStreams, cpuLoad, memoryLoad);
+            return new LoadCompConf(id, parallelism, newStreams, cpuLoad, memoryLoad, slp);
+        } else {
+            return this;
+        }
+    }
+
+    /**
+     * Override the SlowExecutorPattern with a new one.
+     * @param slp the new pattern or null if you don't want it to change
+     * @return a copy of this with the adjustments made.
+     */
+    public LoadCompConf overrideSlowExecutorPattern(SlowExecutorPattern slp) {
+        if (slp != null) {
+            return new LoadCompConf(id, parallelism, streams, cpuLoad, memoryLoad, slp);
         } else {
             return this;
         }
@@ -155,6 +176,7 @@ public class LoadCompConf {
         private List<OutputStream> streams;
         private double cpuLoad = 0.0;
         private double memoryLoad = 0.0;
+        private SlowExecutorPattern slp = null;
 
         public String getId() {
             return id;
@@ -206,8 +228,13 @@ public class LoadCompConf {
             return this;
         }
 
+        public Builder withSlowExecutorPattern(SlowExecutorPattern slp) {
+            this.slp = slp;
+            return this;
+        }
+
         public LoadCompConf build() {
-            return new LoadCompConf(id, parallelism, streams, cpuLoad, memoryLoad);
+            return new LoadCompConf(id, parallelism, streams, cpuLoad, memoryLoad, slp);
         }
     }
 
@@ -217,7 +244,8 @@ public class LoadCompConf {
      * @param parallelism tha parallelism of the component.
      * @param streams the output streams of the component.
      */
-    public LoadCompConf(String id, int parallelism, List<OutputStream> streams, double cpuLoad, double memoryLoad) {
+    public LoadCompConf(String id, int parallelism, List<OutputStream> streams, double cpuLoad, double memoryLoad,
+                        SlowExecutorPattern slp) {
         this.id = id;
         if (id == null) {
             throw new IllegalArgumentException("A spout ID cannot be null");
@@ -226,5 +254,6 @@ public class LoadCompConf {
         this.streams = streams;
         this.cpuLoad = cpuLoad;
         this.memoryLoad = memoryLoad;
+        this.slp = slp;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
index 10ec698..33b4def 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java
@@ -586,7 +586,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
     }
 
 
-    static class FixedWidthReporter extends  ColumnsFileReporter {
+    static class FixedWidthReporter extends ColumnsFileReporter {
         public final String longFormat;
         public final String stringFormat;
 
@@ -649,7 +649,7 @@ public class LoadMetricsServer extends HttpForwardingMetricsServer {
         }
     }
 
-    static class SepValReporter extends  ColumnsFileReporter {
+    static class SepValReporter extends ColumnsFileReporter {
         private final String separator;
 
         public SepValReporter(String separator, String path, Map<String, String> query, Map<String, MetricExtractor> extractorsMap)

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
index 20b2926..8edf660 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java
@@ -123,7 +123,7 @@ public class NormalDistStats implements Serializable {
     }
 
     /**
-     * Generate a random number that follows the statistical distribution
+     * Generate a random number that follows the statistical distribution.
      * @param rand the random number generator to use
      * @return the next number that should follow the statistical distribution.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
index 99357d9..29566b1 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStream.java
@@ -105,7 +105,7 @@ public class OutputStream implements Serializable {
     }
 
     /**
-     * Create a new stream with stats
+     * Create a new stream with stats.
      * @param id the id of the stream
      * @param rate the rate of tuples being emitted on this stream
      * @param areKeysSkewed true if keys are skewed else false.  For skewed keys

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/SlowExecutorPattern.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/SlowExecutorPattern.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/SlowExecutorPattern.java
new file mode 100644
index 0000000..d2c3ac5
--- /dev/null
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/SlowExecutorPattern.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.storm.utils.ObjectReader;
+
+/**
+ * A repeating pattern of skewedness in processing times.  This is used to simulate an executor that slows down.
+ */
+public class SlowExecutorPattern implements Serializable {
+    private static final Pattern PARSER = Pattern.compile("\\s*(?<slowness>[^:]+)\\s*(?::\\s*(?<count>[0-9]+))?\\s*");
+    public final double maxSlownessMs;
+    public final int count;
+
+    /**
+     * Parses a string (command line) representation of "&lt;SLOWNESS&gt;(:&lt;COUNT&gt;)?".
+     * @param strRepresentation the string representation to parse
+     * @return the corresponding SlowExecutorPattern.
+     */
+    public static SlowExecutorPattern fromString(String strRepresentation) {
+        Matcher m = PARSER.matcher(strRepresentation);
+        if (!m.matches()) {
+            throw new IllegalArgumentException(strRepresentation + " is not in the form <SLOWNESS>(:<COUNT>)?");
+        }
+        double slownessMs = Double.valueOf(m.group("slowness"));
+        String c = m.group("count");
+        int count = c == null ? 1 : Integer.valueOf(c);
+        return new SlowExecutorPattern(slownessMs, count);
+    }
+
+    /**
+     * Creates a SlowExecutorPattern from a Map config.
+     * @param conf the conf to parse.
+     * @return the corresponding SlowExecutorPattern.
+     */
+    public static SlowExecutorPattern fromConf(Map<String, Object> conf) {
+        double slowness = ObjectReader.getDouble(conf.get("slownessMs"), 0.0);
+        int count = ObjectReader.getInt(conf.get("count"), 1);
+        return new SlowExecutorPattern(slowness, count);
+    }
+
+    /**
+     * Convert this to a Config map.
+     * @return the corresponding Config map to this.
+     */
+    public Map<String, Object> toConf() {
+        Map<String, Object> ret = new HashMap<>();
+        ret.put("slownessMs", maxSlownessMs);
+        ret.put("count", count);
+        return ret;
+    }
+
+    public SlowExecutorPattern(double maxSlownessMs, int count) {
+        this.count = count;
+        this.maxSlownessMs = maxSlownessMs;
+    }
+
+    public double getExtraSlowness(int index) {
+        return (index >= count) ? 0 : maxSlownessMs;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
index 3d3a271..3af5803 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java
@@ -21,7 +21,6 @@ package org.apache.storm.loadgen;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
@@ -32,6 +31,7 @@ import org.apache.commons.cli.ParseException;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.metric.LoggingMetricsConsumer;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.BasicOutputCollector;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
@@ -40,6 +40,8 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,12 +88,28 @@ public class ThroughputVsLatency {
     }
 
     public static class SplitSentence extends BaseBasicBolt {
+        private ExecAndProcessLatencyEngine sleep;
+        private int executorIndex;
+
+        public SplitSentence(SlowExecutorPattern slowness) {
+            super();
+            sleep = new ExecAndProcessLatencyEngine(slowness);
+        }
+
+        @Override
+        public void prepare(Map stormConf, TopologyContext context) {
+            executorIndex = context.getThisTaskIndex();
+            sleep.prepare();
+        }
+
         @Override
         public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String sentence = tuple.getString(0);
-            for (String word: sentence.split("\\s+")) {
-                collector.emit(new Values(word, 1));
-            }
+            sleep.simulateProcessAndExecTime(executorIndex, Time.nanoTime(), null , () -> {
+                String sentence = tuple.getString(0);
+                for (String word: sentence.split("\\s+")) {
+                    collector.emit(new Values(word, 1));
+                }
+            });
         }
 
         @Override
@@ -163,6 +181,13 @@ public class ThroughputVsLatency {
             .desc("Number of splitter bolts to use (defaults to " + DEFAULT_NUM_SPLITS + ")")
             .build());
         options.addOption(Option.builder()
+            .longOpt("splitter-imbalance")
+            .argName("MS(:COUNT)?")
+            .hasArg()
+            .desc("The number of ms that the first COUNT splitters will wait before processing.  This creates an imbalance "
+                + "that helps test load aware groupings (defaults to 0:1)")
+            .build());
+        options.addOption(Option.builder()
             .longOpt("counters")
             .argName("NUM")
             .hasArg()
@@ -172,6 +197,7 @@ public class ThroughputVsLatency {
         CommandLineParser parser = new DefaultParser();
         CommandLine cmd = null;
         Exception commandLineException = null;
+        SlowExecutorPattern slowness = null;
         double numMins = TEST_EXECUTE_TIME_DEFAULT;
         double ratePerSecond = DEFAULT_RATE_PER_SECOND;
         String name = DEFAULT_TOPO_NAME;
@@ -198,6 +224,9 @@ public class ThroughputVsLatency {
             if (cmd.hasOption("counters")) {
                 numCounts = Integer.parseInt(cmd.getOptionValue("counters"));
             }
+            if (cmd.hasOption("splitter-imbalance")) {
+                slowness = SlowExecutorPattern.fromString(cmd.getOptionValue("splitter-imbalance"));
+            }
         } catch (ParseException | NumberFormatException e) {
             commandLineException = e;
         }
@@ -216,11 +245,12 @@ public class ThroughputVsLatency {
         metrics.put("count_parallel", numCounts);
 
         Config conf = new Config();
-        LoadMetricsServer metricServer = new LoadMetricsServer(conf, cmd, metrics);
+        Map<String, Object> sysConf = Utils.readStormConfig();
+        LoadMetricsServer metricServer = new LoadMetricsServer(sysConf, cmd, metrics);
         metricServer.serve();
         String url = metricServer.getUrl();
 
-        NimbusClient client = NimbusClient.getConfiguredClient(conf);
+        NimbusClient client = NimbusClient.getConfiguredClient(sysConf);
         conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
         conf.registerMetricsConsumer(HttpForwardingMetricsConsumer.class, url, 1);
         Map<String, String> workerMetrics = new HashMap<>();
@@ -238,7 +268,8 @@ public class ThroughputVsLatency {
         TopologyBuilder builder = new TopologyBuilder();
 
         builder.setSpout("spout", new FastRandomSentenceSpout((long) ratePerSecond / numSpouts), numSpouts);
-        builder.setBolt("split", new SplitSentence(), numSplits).shuffleGrouping("spout");
+        builder.setBolt("split", new SplitSentence(slowness), numSplits)
+            .shuffleGrouping("spout");
         builder.setBolt("count", new WordCount(), numCounts).fieldsGrouping("split", new Fields("word"));
 
         int exitStatus = -1;

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
index b171972..4f297ab 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java
@@ -228,7 +228,7 @@ public class TopologyLoadConf {
     }
 
     /**
-     * The first one that is not null
+     * The first one that is not null.
      * @param rest all the other somethings
      * @param <V> whatever type you want.
      * @return the first one that is not null
@@ -266,6 +266,15 @@ public class TopologyLoadConf {
         return ret;
     }
 
+    private LoadCompConf overrideCompSlowExec(LoadCompConf comp, Map<String, SlowExecutorPattern> topoSpecific) {
+        LoadCompConf ret = comp;
+        SlowExecutorPattern slp = topoSpecific.get(name + ":" + comp.id);
+        if (slp != null) {
+            ret = ret.overrideSlowExecutorPattern(slp);
+        }
+        return ret;
+    }
+
     /**
      * Scale all of the components in the topology by a percentage (but keep the throughput the same).
      * @param v the amount to scale them by.  1.0 is nothing, 0.5 cuts them in half, 2.0 doubles them.
@@ -299,6 +308,22 @@ public class TopologyLoadConf {
     }
 
     /**
+     * Override the SlowExecutorPattern for given components.
+     * @param topoSpecific what we are going to use to override.
+     * @return a copy of this with the needed adjustments made.
+     */
+    public TopologyLoadConf overrideSlowExecs(Map<String, SlowExecutorPattern> topoSpecific) {
+        if (topoSpecific == null || topoSpecific.isEmpty()) {
+            return this;
+        }
+        List<LoadCompConf> modedSpouts = spouts.stream().map((s) -> overrideCompSlowExec(s, topoSpecific))
+            .collect(Collectors.toList());
+        List<LoadCompConf> modedBolts = bolts.stream().map((b) -> overrideCompSlowExec(b, topoSpecific))
+            .collect(Collectors.toList());
+        return new TopologyLoadConf(name, topoConf, modedSpouts, modedBolts, streams);
+    }
+
+    /**
      * Create a new version of this topology with identifiable information removed.
      * @return the anonymized version of the TopologyLoadConf.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index 0bc56fb..658aadc 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -62,7 +62,6 @@ public class Task {
     private TopologyContext systemTopologyContext;
     private TopologyContext userTopologyContext;
     private WorkerTopologyContext workerTopologyContext;
-    private LoadMapping loadMapping;
     private Integer taskId;
     private String componentId;
     private Object taskObject; // Spout/Bolt object
@@ -84,7 +83,6 @@ public class Task {
         this.builtInMetrics = BuiltinMetricsUtil.mkData(executor.getType(), this.executorStats);
         this.workerTopologyContext = executor.getWorkerTopologyContext();
         this.emitSampler = ConfigUtils.mkStatsSampler(topoConf);
-        this.loadMapping = workerData.getLoadMapping();
         this.systemTopologyContext = mkTopologyContext(workerData.getSystemTopology());
         this.userTopologyContext = mkTopologyContext(workerData.getTopology());
         this.taskObject = mkTaskObject();

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 378099c..9e7bd0b 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -286,7 +286,7 @@ public class Worker implements Shutdownable, DaemonCommon {
     }
 
     public void doRefreshLoad() {
-        workerState.refreshLoad();
+        workerState.refreshLoad(executorsAtom.get());
 
         final List<IRunningExecutor> executors = executorsAtom.get();
         for (IRunningExecutor executor : executors) {

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index d244491..33ea579 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -28,6 +28,7 @@ import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.cluster.VersionedData;
 import org.apache.storm.daemon.StormCommon;
 import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.executor.IRunningExecutor;
 import org.apache.storm.generated.Assignment;
 import org.apache.storm.generated.DebugOptions;
 import org.apache.storm.generated.Grouping;
@@ -444,15 +445,20 @@ public class WorkerState {
         this.throttleOn.set(backpressure);
     }
 
-    public void refreshLoad() {
-        Set<Integer> remoteTasks = Sets.difference(new HashSet<Integer>(outboundTasks), new HashSet<>(taskIds));
+    private static double getQueueLoad(DisruptorQueue q) {
+        DisruptorQueue.QueueMetrics qMetrics = q.getMetrics();
+        return ((double) qMetrics.population()) / qMetrics.capacity();
+    }
+
+    public void refreshLoad(List<IRunningExecutor> execs) {
+        Set<Integer> remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(taskIds));
         Long now = System.currentTimeMillis();
-        Map<Integer, Double> localLoad = shortExecutorReceiveQueueMap.entrySet().stream().collect(Collectors.toMap(
-            (Function<Map.Entry<Integer, DisruptorQueue>, Integer>) Map.Entry::getKey,
-            (Function<Map.Entry<Integer, DisruptorQueue>, Double>) entry -> {
-                DisruptorQueue.QueueMetrics qMetrics = entry.getValue().getMetrics();
-                return ( (double) qMetrics.population()) / qMetrics.capacity();
-            }));
+        Map<Integer, Double> localLoad = new HashMap<>();
+        for (IRunningExecutor exec: execs) {
+            double receiveLoad = getQueueLoad(exec.getReceiveQueue());
+            double sendLoad = getQueueLoad(exec.getSendQueue());
+            localLoad.put(exec.getExecutorId().get(0).intValue(), Math.max(receiveLoad, sendLoad));
+        }
 
         Map<Integer, Load> remoteLoad = new HashMap<>();
         cachedNodeToPortSocket.get().values().stream().forEach(conn -> remoteLoad.putAll(conn.getLoad(remoteTasks)));

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index b787fbb..e55aca0 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -115,7 +115,7 @@ public abstract class Executor implements Callable, EventHandler<Object> {
 
     protected final IReportError reportError;
     protected final Random rand;
-    protected final DisruptorQueue transferQueue;
+    protected final DisruptorQueue sendQueue;
     protected final DisruptorQueue receiveQueue;
     protected Map<Integer, Task> idToTask;
     protected final Map<String, String> credentials;
@@ -140,8 +140,8 @@ public abstract class Executor implements Callable, EventHandler<Object> {
         this.stormActive = workerData.getIsTopologyActive();
         this.stormComponentDebug = workerData.getStormComponentToDebug();
 
-        this.transferQueue = mkExecutorBatchQueue(topoConf, executorId);
-        this.executorTransfer = new ExecutorTransfer(workerData, transferQueue, topoConf);
+        this.sendQueue = mkExecutorBatchQueue(topoConf, executorId);
+        this.executorTransfer = new ExecutorTransfer(workerData, sendQueue, topoConf);
 
         this.suicideFn = workerData.getSuicideCallback();
         try {
@@ -253,7 +253,7 @@ public abstract class Executor implements Callable, EventHandler<Object> {
         setupTicks(StatsUtil.SPOUT.equals(type));
 
         LOG.info("Finished loading executor " + componentId + ":" + executorId);
-        return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
+        return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask, receiveQueue, sendQueue);
     }
 
     public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
@@ -562,7 +562,7 @@ public abstract class Executor implements Callable, EventHandler<Object> {
     }
 
     public DisruptorQueue getTransferWorkerQueue() {
-        return transferQueue;
+        return sendQueue;
     }
 
     public IStormClusterState getStormClusterState() {

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 7ea48b0..c7691e4 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -31,6 +31,7 @@ import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.DisruptorQueue;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,11 +45,16 @@ public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
     private final Executor executor;
     private final List<Utils.SmartThread> threads;
     private final Map<Integer, Task> taskDatas;
+    private final DisruptorQueue receiveQueue;
+    private final DisruptorQueue sendQueue;
 
-    public ExecutorShutdown(Executor executor, List<Utils.SmartThread> threads, Map<Integer, Task> taskDatas) {
+    public ExecutorShutdown(Executor executor, List<Utils.SmartThread> threads, Map<Integer, Task> taskDatas,
+                            DisruptorQueue receiveQueue, DisruptorQueue sendQueue) {
         this.executor = executor;
         this.threads = threads;
         this.taskDatas = taskDatas;
+        this.receiveQueue = receiveQueue;
+        this.sendQueue = sendQueue;
     }
 
     @Override
@@ -80,6 +86,16 @@ public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
     }
 
     @Override
+    public DisruptorQueue getReceiveQueue() {
+        return receiveQueue;
+    }
+
+    @Override
+    public DisruptorQueue getSendQueue() {
+        return sendQueue;
+    }
+
+    @Override
     public void shutdown() {
         try {
             LOG.info("Shutting down executor " + executor.getComponentId() + ":" + executor.getExecutorId());

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
index e7a4117..4b7f483 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
@@ -22,6 +22,7 @@ import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.grouping.LoadMapping;
 
 import java.util.List;
+import org.apache.storm.utils.DisruptorQueue;
 
 public interface IRunningExecutor {
 
@@ -30,4 +31,6 @@ public interface IRunningExecutor {
     void credentialsChanged(Credentials credentials);
     void loadChanged(LoadMapping loadMapping);
     boolean getBackPressureFlag();
+    DisruptorQueue getReceiveQueue();
+    DisruptorQueue getSendQueue();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 5d9edf1..4e46dc5 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -70,7 +70,7 @@ public class BoltExecutor extends Executor {
                 ((ICredentialsListener) boltObject).setCredentials(credentials);
             }
             if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
-                Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue,
+                Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", sendQueue, "receive", receiveQueue,
                         "transfer", workerData.getTransferQueue());
                 BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
 
@@ -78,7 +78,7 @@ public class BoltExecutor extends Executor {
                 BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, topoConf, userContext);
                 BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), topoConf, userContext);
             } else {
-                Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+                Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", sendQueue, "receive", receiveQueue);
                 BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
             }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index 6e85f34..c465338 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -118,7 +118,7 @@ public class SpoutExecutor extends Executor {
             this.outputCollectors.add(outputCollector);
 
             taskData.getBuiltInMetrics().registerAll(topoConf, taskData.getUserContext());
-            Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+            Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", sendQueue, "receive", receiveQueue);
             BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, taskData.getUserContext());
 
             if (spoutObject instanceof ICredentialsListener) {
@@ -152,7 +152,7 @@ public class SpoutExecutor extends Executor {
                             spout.activate();
                         }
                     }
-                    if (!transferQueue.isFull() && !throttleOn && !reachedMaxSpoutPending) {
+                    if (!sendQueue.isFull() && !throttleOn && !reachedMaxSpoutPending) {
                         for (ISpout spout : spouts) {
                             spout.nextTuple();
                         }

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 0c0560a..f5b63ec 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -15,61 +15,61 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.grouping;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.WorkerTopologyContext;
 
 public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
-    private static final int CAPACITY = 1000;
+    static final int CAPACITY = 1000;
+    private static final int MAX_WEIGHT = 100;
+    private static class IndexAndWeights {
+        final int index;
+        int weight;
+
+        IndexAndWeights(int index) {
+            this.index = index;
+            weight = MAX_WEIGHT;
+        }
+    }
 
+    private final Map<Integer, IndexAndWeights> orig = new HashMap<>();
     private Random random;
-    private List<Integer>[] rets;
-    private int[] targets;
-    private int[] loads;
-    private int[] unassigned;
-    private int[] choices;
-    private int[] prepareChoices;
+    @VisibleForTesting
+    List<Integer>[] rets;
+    @VisibleForTesting
+    volatile int[] choices;
+    private volatile int[] prepareChoices;
     private AtomicInteger current;
 
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
         random = new Random();
 
-        rets = (List<Integer>[])new List<?>[targetTasks.size()];
-        targets = new int[targetTasks.size()];
-        for (int i = 0; i < targets.length; i++) {
-            rets[i] = Arrays.asList(targetTasks.get(i));
-            targets[i] = targetTasks.get(i);
+        rets = (List<Integer>[]) new List<?>[targetTasks.size()];
+        int i = 0;
+        for (int target : targetTasks) {
+            rets[i] = Arrays.asList(target);
+            orig.put(target, new IndexAndWeights(i));
+            i++;
         }
 
         // can't leave choices to be empty, so initiate it similar as ShuffleGrouping
         choices = new int[CAPACITY];
 
-        for (int i = 0 ; i < CAPACITY ; i++) {
-            choices[i] = i % rets.length;
-        }
-
-        shuffleArray(choices);
-        current = new AtomicInteger(-1);
-
+        current = new AtomicInteger(0);
         // allocate another array to be switched
         prepareChoices = new int[CAPACITY];
-
-        // allocating only once
-        loads = new int[targets.length];
-        unassigned = new int[targets.length];
+        updateRing(null);
     }
 
     @Override
@@ -93,48 +93,44 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         updateRing(loadMapping);
     }
 
-    private void updateRing(LoadMapping load) {
-        int localTotal = 0;
-        for (int i = 0 ; i < targets.length; i++) {
-            int val = (int)(101 - (load.get(targets[i]) * 100));
-            loads[i] = val;
-            localTotal += val;
+    private synchronized void updateRing(LoadMapping load) {
+        //We will adjust weights based off of the minimum load
+        double min = load == null ? 0 : orig.keySet().stream().mapToDouble((key) -> load.get(key)).min().getAsDouble();
+        for (Map.Entry<Integer, IndexAndWeights> target: orig.entrySet()) {
+            IndexAndWeights val = target.getValue();
+            double l = load == null ? 0.0 : load.get(target.getKey());
+            if (l <= min + (0.05)) {
+                //We assume that within 5% of the minimum congestion is still fine.
+                //Not congested we grow (but slowly)
+                val.weight = Math.min(MAX_WEIGHT, val.weight + 1);
+            } else {
+                //Congested we contract much more quickly
+                val.weight = Math.max(0, val.weight - 10);
+            }
         }
+        //Now we need to build the array
+        long weightSum = orig.values().stream().mapToLong((w) -> w.weight).sum();
+        //Now we can calculate a percentage
 
         int currentIdx = 0;
-        int unassignedIdx = 0;
-        for (int i = 0 ; i < loads.length ; i++) {
-            if (currentIdx == CAPACITY) {
-                break;
-            }
-
-            int loadForTask = loads[i];
-            int amount = Math.round(loadForTask * 1.0f * CAPACITY / localTotal);
-            // assign at least one for task
-            if (amount == 0) {
-                unassigned[unassignedIdx++] = i;
-            }
-            for (int j = 0; j < amount; j++) {
-                if (currentIdx == CAPACITY) {
-                    break;
+        if (weightSum > 0) {
+            for (IndexAndWeights indexAndWeights : orig.values()) {
+                int count = (int) ((indexAndWeights.weight / (double) weightSum) * CAPACITY);
+                for (int i = 0; i < count && currentIdx < CAPACITY; i++) {
+                    prepareChoices[currentIdx] = indexAndWeights.index;
+                    currentIdx++;
                 }
-
-                prepareChoices[currentIdx++] = i;
             }
-        }
 
-        if (currentIdx < CAPACITY) {
-            // if there're some rooms, give unassigned tasks a chance to be included
-            // this should be really small amount, so just add them sequentially
-            if (unassignedIdx > 0) {
-                for (int i = currentIdx ; i < CAPACITY ; i++) {
-                    prepareChoices[i] = unassigned[(i - currentIdx) % unassignedIdx];
-                }
-            } else {
-                // just pick random
-                for (int i = currentIdx ; i < CAPACITY ; i++) {
-                    prepareChoices[i] = random.nextInt(loads.length);
-                }
+            //in case we didn't fill in enough
+            for (; currentIdx < CAPACITY; currentIdx++) {
+                prepareChoices[currentIdx] = prepareChoices[random.nextInt(currentIdx)];
+            }
+        } else {
+            //This really should be impossible, because we go off of the min load, and inc anything within 5% of it.
+            // But just to be sure it is never an issue, especially with float rounding etc.
+            for (;currentIdx < CAPACITY; currentIdx++) {
+                prepareChoices[currentIdx] = currentIdx % rets.length;
             }
         }
 
@@ -160,5 +156,4 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         arr[i] = arr[j];
         arr[j] = tmp;
     }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/b9f1d7ef/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 53ac404..a5f9304 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -19,12 +19,12 @@ package org.apache.storm.grouping;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.storm.StormTimer;
+import java.util.Arrays;
 import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.NullStruct;
 import org.apache.storm.task.WorkerTopologyContext;
-import org.apache.storm.utils.Utils;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -55,6 +55,64 @@ public class LoadAwareShuffleGroupingTest {
     private static final Logger LOG = LoggerFactory.getLogger(LoadAwareShuffleGroupingTest.class);
 
     @Test
+    public void testUnevenLoadOverTime() throws Exception {
+        LoadAwareShuffleGrouping grouping = new LoadAwareShuffleGrouping();
+        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+        grouping.prepare(context, new GlobalStreamId("a", "default"), Arrays.asList(1, 2));
+        double expectedOneWeight = 100.0;
+        double expectedTwoWeight = 100.0;
+
+        Map<Integer, Double> localLoad = new HashMap<>();
+        localLoad.put(1, 1.0);
+        localLoad.put(2, 0.0);
+        LoadMapping lm = new LoadMapping();
+        lm.setLocal(localLoad);
+        //First verify that if something has a high load it's distribution will drop over time
+        for (int i = 9; i >= 0; i--) {
+            grouping.refreshLoad(lm);
+            expectedOneWeight -= 10.0;
+            Map<Integer, Double> countByType = count(grouping.choices, grouping.rets);
+            LOG.info("contByType = {}", countByType);
+            double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight);
+            double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight);
+            assertEquals("i = " + i,
+                expectedOnePercentage, countByType.getOrDefault(1, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+                0.01);
+            assertEquals("i = " + i,
+                expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+                0.01);
+        }
+
+        //Now verify that when it is switched we can recover
+        localLoad.put(1, 0.0);
+        localLoad.put(2, 1.0);
+        lm.setLocal(localLoad);
+
+        while (expectedOneWeight < 100.0) {
+            grouping.refreshLoad(lm);
+            expectedOneWeight += 1.0;
+            expectedTwoWeight = Math.max(0.0, expectedTwoWeight - 10.0);
+            Map<Integer, Double> countByType = count(grouping.choices, grouping.rets);
+            LOG.info("contByType = {}", countByType);
+            double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight);
+            double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight);
+            assertEquals(expectedOnePercentage, countByType.getOrDefault(1, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+                0.01);
+            assertEquals(expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+                0.01);
+        }
+    }
+
+    private Map<Integer,Double> count(int[] choices, List<Integer>[] rets) {
+        Map<Integer, Double> ret = new HashMap<>();
+        for (int i : choices) {
+            int task = rets[i].get(0);
+            ret.put(task, ret.getOrDefault(task, 0.0) + 1);
+        }
+        return ret;
+    }
+
+    @Test
     public void testLoadAwareShuffleGroupingWithEvenLoad() {
         // just pick arbitrary number
         final int numTasks = 7;
@@ -161,36 +219,6 @@ public class LoadAwareShuffleGroupingTest {
     }
 
     @Test
-    public void testLoadAwareShuffleGroupingWithUnevenLoad() {
-        // just pick arbitrary number
-        final int numTasks = 7;
-        final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
-
-        // Define our taskIds and loads
-        final List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
-        final LoadMapping loadMapping = buildLocalTasksUnevenLoadMapping(availableTaskIds);
-
-        runDistributionVerificationTestWithUnevenLoad(numTasks, grouper, availableTaskIds,
-            loadMapping);
-    }
-
-    @Test
-    public void testLoadAwareShuffleGroupingWithRandomTasksAndRandomLoad() {
-        for (int trial = 0 ; trial < 200 ; trial++) {
-            // just pick arbitrary number in 5 ~ 100
-            final int numTasks = new Random().nextInt(96) + 5;
-            final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
-
-            // Define our taskIds and loads
-            final List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
-            final LoadMapping loadMapping = buildLocalTasksRandomLoadMapping(availableTaskIds);
-
-            runDistributionVerificationTestWithUnevenLoad(numTasks, grouper, availableTaskIds,
-                loadMapping);
-        }
-    }
-
-    @Test
     public void testShuffleLoadEven() {
         // port test-shuffle-load-even
         LoadAwareCustomStreamGrouping shuffler = GrouperFactory
@@ -228,46 +256,6 @@ public class LoadAwareShuffleGroupingTest {
         assertTrue(load2 <= maxPrCount);
     }
 
-    @Test
-    public void testShuffleLoadUneven() {
-        // port test-shuffle-load-uneven
-        LoadAwareCustomStreamGrouping shuffler = GrouperFactory
-            .mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
-                Lists.newArrayList(1, 2), Collections.emptyMap());
-        int numMessages = 100000;
-        int min1PrCount = (int) (numMessages * (0.33 - ACCEPTABLE_MARGIN));
-        int max1PrCount = (int) (numMessages * (0.33 + ACCEPTABLE_MARGIN));
-        int min2PrCount = (int) (numMessages * (0.66 - ACCEPTABLE_MARGIN));
-        int max2PrCount = (int) (numMessages * (0.66 + ACCEPTABLE_MARGIN));
-        LoadMapping load = new LoadMapping();
-        Map<Integer, Double> loadInfoMap = new HashMap<>();
-        loadInfoMap.put(1, 0.5);
-        loadInfoMap.put(2, 0.0);
-        load.setLocal(loadInfoMap);
-
-        // force triggers building ring
-        shuffler.refreshLoad(load);
-
-        List<Object> data = Lists.newArrayList(1, 2);
-        int[] frequencies = new int[3]; // task id starts from 1
-        for (int i = 0 ; i < numMessages ; i++) {
-            List<Integer> tasks = shuffler.chooseTasks(1, data);
-            for (int task : tasks) {
-                frequencies[task]++;
-            }
-        }
-
-        int load1 = frequencies[1];
-        int load2 = frequencies[2];
-
-        LOG.info("Frequency info: load1 = {}, load2 = {}", load1, load2);
-
-        assertTrue(load1 >= min1PrCount);
-        assertTrue(load1 <= max1PrCount);
-        assertTrue(load2 >= min2PrCount);
-        assertTrue(load2 <= max2PrCount);
-    }
-
     @Ignore
     @Test
     public void testBenchmarkLoadAwareShuffleGroupingEvenLoad() {
@@ -374,37 +362,6 @@ public class LoadAwareShuffleGroupingTest {
         return taskCounts;
     }
 
-    private void runDistributionVerificationTestWithUnevenLoad(int numTasks,
-        LoadAwareShuffleGrouping grouper, List<Integer> availableTaskIds,
-        LoadMapping loadMapping) {
-        int[] loads = new int[numTasks];
-        int localTotal = 0;
-        List<Double> loadRate = new ArrayList<>();
-        for (int i = 0; i < numTasks; i++) {
-            int val = (int)(101 - (loadMapping.get(i) * 100));
-            loads[i] = val;
-            localTotal += val;
-        }
-
-        for (int i = 0; i < numTasks; i++) {
-            loadRate.add(loads[i] * 1.0 / localTotal);
-        }
-
-        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
-        grouper.prepare(context, null, availableTaskIds);
-
-        // Keep track of how many times we see each taskId
-        int totalEmits = 5000 * numTasks;
-        int[] taskCounts = runChooseTasksWithVerification(grouper, totalEmits, numTasks, loadMapping);
-
-        int delta = (int) (totalEmits * ACCEPTABLE_MARGIN);
-        for (int i = 0; i < numTasks; i++) {
-            int expected = (int) (totalEmits * loadRate.get(i));
-            assertTrue("Distribution should respect the task load with small delta",
-                taskCounts[i] >= expected - delta && taskCounts[i] <= expected + delta);
-        }
-    }
-
     private void runSimpleBenchmark(LoadAwareCustomStreamGrouping grouper,
         List<Integer> availableTaskIds, LoadMapping loadMapping) {
         // Task Id not used, so just pick a static value


[2/2] storm git commit: Merge branch 'STORM-2733' of https://github.com/revans2/incubator-storm into STORM-2733

Posted by bo...@apache.org.
Merge branch 'STORM-2733' of https://github.com/revans2/incubator-storm into STORM-2733

STORM-2733: Better load aware shuffle implementation

This closes #2321


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/124acb92
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/124acb92
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/124acb92

Branch: refs/heads/master
Commit: 124acb92dff04a57b530ab4d95a698abc8ff46d9
Parents: 7901051 b9f1d7e
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Thu Sep 14 10:15:16 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Thu Sep 14 10:15:16 2017 -0500

----------------------------------------------------------------------
 examples/storm-loadgen/README.md                |   5 +
 examples/storm-loadgen/pom.xml                  |   2 +-
 .../loadgen/ExecAndProcessLatencyEngine.java    | 119 ++++++++++++++
 .../java/org/apache/storm/loadgen/GenLoad.java  |  25 +++
 .../java/org/apache/storm/loadgen/LoadBolt.java |  63 +------
 .../org/apache/storm/loadgen/LoadCompConf.java  |  43 ++++-
 .../apache/storm/loadgen/LoadMetricsServer.java |   4 +-
 .../apache/storm/loadgen/NormalDistStats.java   |   2 +-
 .../org/apache/storm/loadgen/OutputStream.java  |   2 +-
 .../storm/loadgen/SlowExecutorPattern.java      |  83 ++++++++++
 .../storm/loadgen/ThroughputVsLatency.java      |  47 +++++-
 .../apache/storm/loadgen/TopologyLoadConf.java  |  27 ++-
 .../src/jvm/org/apache/storm/daemon/Task.java   |   2 -
 .../org/apache/storm/daemon/worker/Worker.java  |   2 +-
 .../apache/storm/daemon/worker/WorkerState.java |  22 ++-
 .../jvm/org/apache/storm/executor/Executor.java |  10 +-
 .../apache/storm/executor/ExecutorShutdown.java |  18 +-
 .../apache/storm/executor/IRunningExecutor.java |   3 +
 .../storm/executor/bolt/BoltExecutor.java       |   4 +-
 .../storm/executor/spout/SpoutExecutor.java     |   4 +-
 .../grouping/LoadAwareShuffleGrouping.java      | 129 +++++++--------
 .../grouping/LoadAwareShuffleGroupingTest.java  | 163 +++++++------------
 22 files changed, 512 insertions(+), 267 deletions(-)
----------------------------------------------------------------------