You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/10 00:12:24 UTC

[1/2] storm git commit: STORM-2940 dynamically set the CAPACITY value of LoadAwareShuffleGrouping

Repository: storm
Updated Branches:
  refs/heads/master a715e9a5c -> 0b3203239


STORM-2940 dynamically set the CAPACITY value of LoadAwareShuffleGrouping

* lose granularity if targetTask size is larger than 1000
* add tests

This closes #2551


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b332503b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b332503b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b332503b

Branch: refs/heads/master
Commit: b332503b113fa6cde6afa320c9c190e3aad5b4c3
Parents: a715e9a
Author: Ethan Li <et...@gmail.com>
Authored: Wed Feb 7 13:40:45 2018 -0600
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Feb 10 09:11:37 2018 +0900

----------------------------------------------------------------------
 .../grouping/LoadAwareShuffleGrouping.java      | 35 ++++++++++++--------
 .../grouping/LoadAwareShuffleGroupingTest.java  | 33 +++++++++++++-----
 2 files changed, 46 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b332503b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 3fd75e5..832ba7f 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
-    static final int CAPACITY = 1000;
+    private int capacity;
     private static final int MAX_WEIGHT = 100;
     private static class IndexAndWeights {
         final int index;
@@ -85,6 +85,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         sourceNodeInfo = new NodeInfo(context.getThisWorkerHost(), Sets.newHashSet((long) context.getThisWorkerPort()));
         taskToNodePort = context.getTaskToNodePort();
         this.targetTasks = targetTasks;
+        capacity = targetTasks.size() == 1 ? 1 : Math.max(1000, targetTasks.size() * 5);
         conf = context.getConf();
         dnsToSwitchMapping = ReflectionUtils.newInstance((String) conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN));
         localityGroup = new HashMap<>();
@@ -101,11 +102,11 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         }
 
         // can't leave choices to be empty, so initiate it similar as ShuffleGrouping
-        choices = new int[CAPACITY];
+        choices = new int[capacity];
 
         current = new AtomicInteger(0);
         // allocate another array to be switched
-        prepareChoices = new int[CAPACITY];
+        prepareChoices = new int[capacity];
         updateRing(null);
     }
 
@@ -114,9 +115,9 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         int rightNow;
         while (true) {
             rightNow = current.incrementAndGet();
-            if (rightNow < CAPACITY) {
+            if (rightNow < capacity) {
                 return rets[choices[rightNow]];
-            } else if (rightNow == CAPACITY) {
+            } else if (rightNow == capacity) {
                 current.set(0);
                 return rets[choices[0]];
             }
@@ -163,7 +164,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         if (targetInScope.isEmpty()) {
             Scope upScope = Scope.upgrade(currentScope);
             if (upScope == currentScope) {
-                throw new RuntimeException("This executor has no target tasks.");
+                throw new RuntimeException("The current scope " + currentScope + " has no target tasks.");
             }
             currentScope = upScope;
             return transition(load);
@@ -222,21 +223,24 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         if (weightSum > 0) {
             for (int target: targetsInScope) {
                 IndexAndWeights indexAndWeights = orig.get(target);
-                int count = (int) ((indexAndWeights.weight / (double) weightSum) * CAPACITY);
-                for (int i = 0; i < count && currentIdx < CAPACITY; i++) {
+                int count = (int) ((indexAndWeights.weight / (double) weightSum) * capacity);
+                for (int i = 0; i < count && currentIdx < capacity; i++) {
                     prepareChoices[currentIdx] = indexAndWeights.index;
                     currentIdx++;
                 }
             }
 
-            //in case we didn't fill in enough
-            for (; currentIdx < CAPACITY; currentIdx++) {
-                prepareChoices[currentIdx] = prepareChoices[random.nextInt(currentIdx)];
+            if (currentIdx > 0) {
+                //in case we didn't fill in enough
+                for (; currentIdx < capacity; currentIdx++) {
+                    prepareChoices[currentIdx] = prepareChoices[random.nextInt(currentIdx)];
+                }
             }
-        } else {
+        }
+        if (currentIdx == 0) {
             //This really should be impossible, because we go off of the min load, and inc anything within 5% of it.
             // But just to be sure it is never an issue, especially with float rounding etc.
-            for (;currentIdx < CAPACITY; currentIdx++) {
+            for (;currentIdx < capacity; currentIdx++) {
                 prepareChoices[currentIdx] = currentIdx % rets.length;
             }
         }
@@ -322,4 +326,9 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
             }
         }
     }
+
+    //only for test
+    public int getCapacity() {
+        return capacity;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/b332503b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index d704900..12ab32a 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -101,10 +101,10 @@ public class LoadAwareShuffleGroupingTest {
             double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight);
             double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight);
             assertEquals("i = " + i,
-                expectedOnePercentage, countByType.getOrDefault(1, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+                expectedOnePercentage, countByType.getOrDefault(1, 0.0) / grouping.getCapacity(),
                 0.01);
             assertEquals("i = " + i,
-                expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+                expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / grouping.getCapacity(),
                 0.01);
         }
 
@@ -121,9 +121,9 @@ public class LoadAwareShuffleGroupingTest {
             LOG.info("contByType = {}", countByType);
             double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight);
             double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight);
-            assertEquals(expectedOnePercentage, countByType.getOrDefault(1, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+            assertEquals(expectedOnePercentage, countByType.getOrDefault(1, 0.0) / grouping.getCapacity(),
                 0.01);
-            assertEquals(expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+            assertEquals(expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / grouping.getCapacity(),
                 0.01);
         }
     }
@@ -138,9 +138,16 @@ public class LoadAwareShuffleGroupingTest {
     }
 
     @Test
-    public void testLoadAwareShuffleGroupingWithEvenLoad() {
-        // just pick arbitrary number
-        final int numTasks = 7;
+    public void testLoadAwareShuffleGroupingWithEvenLoadWithManyTargets() {
+        testLoadAwareShuffleGroupingWithEvenLoad(1000);
+    }
+
+    @Test
+    public void testLoadAwareShuffleGroupingWithEvenLoadWithLessTargets() {
+        testLoadAwareShuffleGroupingWithEvenLoad(7);
+    }
+
+    private void testLoadAwareShuffleGroupingWithEvenLoad(int numTasks) {
         final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
 
         // Define our taskIds and loads
@@ -166,8 +173,16 @@ public class LoadAwareShuffleGroupingTest {
     }
 
     @Test
-    public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded() throws InterruptedException, ExecutionException {
-        final int numTasks = 7;
+    public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreadedWithManyTargets() throws ExecutionException, InterruptedException {
+        testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded(1000);
+    }
+
+    @Test
+    public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreadedWithLessTargets() throws ExecutionException, InterruptedException {
+        testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded(7);
+    }
+
+    private void testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded(int numTasks) throws InterruptedException, ExecutionException {
 
         final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
 


[2/2] storm git commit: Merge branch 'STORM-2940-merge'

Posted by ka...@apache.org.
Merge branch 'STORM-2940-merge'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0b320323
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0b320323
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0b320323

Branch: refs/heads/master
Commit: 0b320323936018e34c7469af98c76bea50d7dc54
Parents: a715e9a b332503
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Feb 10 09:12:18 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Feb 10 09:12:18 2018 +0900

----------------------------------------------------------------------
 .../grouping/LoadAwareShuffleGrouping.java      | 35 ++++++++++++--------
 .../grouping/LoadAwareShuffleGroupingTest.java  | 33 +++++++++++++-----
 2 files changed, 46 insertions(+), 22 deletions(-)
----------------------------------------------------------------------