You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 18:09:21 UTC

[3/8] storm git commit: STORM-2678 Improve performance of LoadAwareShuffleGrouping

STORM-2678 Improve performance of LoadAwareShuffleGrouping

* introduce 'skip checking update count'
  * we no longer call System.currentTimeMillis() every time
  * but we call AtomicInteger.incrementAndGet() every time
  * this may hurt multi-thread perf. but really faster with single-thread


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8f63d5a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8f63d5a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8f63d5a8

Branch: refs/heads/master
Commit: 8f63d5a85609ca71019ab298859af0b9858262b8
Parents: cab86d0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Aug 4 17:48:32 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Aug 4 18:26:21 2017 +0900

----------------------------------------------------------------------
 .../grouping/LoadAwareShuffleGrouping.java      | 28 ++++++++---
 .../grouping/LoadAwareShuffleGroupingTest.java  | 52 ++++++++++++--------
 2 files changed, 52 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8f63d5a8/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index b460a8a..84e4612 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -26,12 +26,16 @@ import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.WorkerTopologyContext;
 
 public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
     private static final int CAPACITY_TASK_MULTIPLICATION = 100;
 
+    @VisibleForTesting
+    static final int CHECK_UPDATE_INDEX = 100;
+
     private Random random;
     private List<Integer>[] rets;
     private int[] targets;
@@ -39,8 +43,9 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
     private AtomicInteger current;
     private int actualCapacity = 0;
 
+    private AtomicInteger skipCheckingUpdateCount;
+    private AtomicBoolean isUpdating;
     private long lastUpdate = 0;
-    private AtomicBoolean isUpdating = new AtomicBoolean(false);
 
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
@@ -65,6 +70,8 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
 
         Collections.shuffle(choices, random);
         current = new AtomicInteger(0);
+        skipCheckingUpdateCount = new AtomicInteger(0);
+        isUpdating = new AtomicBoolean(false);
     }
 
     @Override
@@ -74,13 +81,18 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
 
     @Override
     public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
-        if ((lastUpdate + 1000) < System.currentTimeMillis()
-            && isUpdating.compareAndSet(false, true)) {
-            // update time earlier to reduce chance to do CAS
-            // concurrent call can still rely on old choices
-            lastUpdate = System.currentTimeMillis();
-            updateRing(load);
-            isUpdating.set(false);
+        if (skipCheckingUpdateCount.incrementAndGet() == CHECK_UPDATE_INDEX) {
+            skipCheckingUpdateCount.set(0);
+            if ((lastUpdate + 1000) < System.currentTimeMillis()
+                && isUpdating.compareAndSet(false, true)) {
+                // before finishing updateRing(), concurrent call will still rely on old choices
+                updateRing(load);
+
+                // update time and open a chance to update ring again
+                lastUpdate = System.currentTimeMillis();
+                skipCheckingUpdateCount.set(0);
+                isUpdating.set(false);
+            }
         }
 
         int rightNow;

http://git-wip-us.apache.org/repos/asf/storm/blob/8f63d5a8/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index bf2269f..667f36d 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -45,6 +45,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 public class LoadAwareShuffleGroupingTest {
+    public static final double ACCEPTABLE_MARGIN = 0.015;
     private static final Logger LOG = LoggerFactory.getLogger(LoadAwareShuffleGroupingTest.class);
 
     @Test
@@ -86,17 +87,19 @@ public class LoadAwareShuffleGroupingTest {
         final LoadMapping loadMapping = buildLocalTasksEvenLoadMapping(availableTaskIds);
 
         final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
-
-        // Call prepare with our available taskIds
         grouper.prepare(context, null, availableTaskIds);
 
-        // triggers building ring
-        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+        // force triggers building ring
+        for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+        }
 
         // calling chooseTasks should be finished before refreshing ring
         // adjusting groupingExecutionsPerThread might be needed with really slow machine
         // we allow race condition between refreshing ring and choosing tasks
         // so it will not make exact even distribution, though diff is expected to be small
+        // given that all threadTasks are finished before refreshing ring,
+        // distribution should be exactly even
         final int groupingExecutionsPerThread = numTasks * 5000;
         final int numThreads = 10;
 
@@ -142,6 +145,19 @@ public class LoadAwareShuffleGroupingTest {
             }
         }
 
+        int[] loads = new int[numTasks];
+        int localTotal = 0;
+        List<Double> loadRate = new ArrayList<>();
+        for (int i = 0; i < numTasks; i++) {
+            int val = (int)(101 - (loadMapping.get(i) * 100));
+            loads[i] = val;
+            localTotal += val;
+        }
+
+        for (int i = 0; i < numTasks; i++) {
+            loadRate.add(loads[i] * 1.0 / localTotal);
+        }
+
         for (int i = 0; i < numTasks; i++) {
             int expected = numThreads * groupingExecutionsPerThread / numTasks;
             assertEquals("Distribution should be even for all nodes", expected, taskIdTotals[i]);
@@ -164,7 +180,7 @@ public class LoadAwareShuffleGroupingTest {
 
     @Test
     public void testLoadAwareShuffleGroupingWithRandomTasksAndRandomLoad() {
-        for (int trial = 0 ; trial < 100 ; trial++) {
+        for (int trial = 0 ; trial < 200 ; trial++) {
             // just pick arbitrary number in 5 ~ 100
             final int numTasks = new Random().nextInt(96) + 5;
             final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
@@ -185,8 +201,8 @@ public class LoadAwareShuffleGroupingTest {
             .mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
                 Lists.newArrayList(1, 2), Collections.emptyMap());
         int numMessages = 100000;
-        int minPrCount = (int) (numMessages * 0.49);
-        int maxPrCount = (int) (numMessages * 0.51);
+        int minPrCount = (int) (numMessages * (0.5 - ACCEPTABLE_MARGIN));
+        int maxPrCount = (int) (numMessages * (0.5 + ACCEPTABLE_MARGIN));
         LoadMapping load = new LoadMapping();
         Map<Integer, Double> loadInfoMap = new HashMap<>();
         loadInfoMap.put(1, 0.0);
@@ -220,10 +236,10 @@ public class LoadAwareShuffleGroupingTest {
             .mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
                 Lists.newArrayList(1, 2), Collections.emptyMap());
         int numMessages = 100000;
-        int min1PrCount = (int) (numMessages * 0.32);
-        int max1PrCount = (int) (numMessages * 0.34);
-        int min2PrCount = (int) (numMessages * 0.65);
-        int max2PrCount = (int) (numMessages * 0.67);
+        int min1PrCount = (int) (numMessages * (0.33 - ACCEPTABLE_MARGIN));
+        int max1PrCount = (int) (numMessages * (0.33 + ACCEPTABLE_MARGIN));
+        int min2PrCount = (int) (numMessages * (0.66 - ACCEPTABLE_MARGIN));
+        int max2PrCount = (int) (numMessages * (0.66 + ACCEPTABLE_MARGIN));
         LoadMapping load = new LoadMapping();
         Map<Integer, Double> loadInfoMap = new HashMap<>();
         loadInfoMap.put(1, 0.5);
@@ -337,8 +353,10 @@ public class LoadAwareShuffleGroupingTest {
         // Task Id not used, so just pick a static value
         int inputTaskId = 100;
 
-        // triggers building ring
-        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+        // force triggers building ring
+        for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+        }
 
         for (int i = 1; i <= totalEmits; i++) {
             List<Integer> taskIds = grouper
@@ -379,7 +397,7 @@ public class LoadAwareShuffleGroupingTest {
         int totalEmits = 5000 * numTasks;
         int[] taskCounts = runChooseTasksWithVerification(grouper, totalEmits, numTasks, loadMapping);
 
-        int delta = (int) (totalEmits * 0.01);
+        int delta = (int) (totalEmits * ACCEPTABLE_MARGIN);
         for (int i = 0; i < numTasks; i++) {
             int expected = (int) (totalEmits * loadRate.get(i));
             assertTrue("Distribution should respect the task load with small delta",
@@ -395,9 +413,6 @@ public class LoadAwareShuffleGroupingTest {
         WorkerTopologyContext context = mock(WorkerTopologyContext.class);
         grouper.prepare(context, null, availableTaskIds);
 
-        // triggers building distribution ring
-        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
-
         long current = System.currentTimeMillis();
         int idx = 0;
         while (true) {
@@ -431,9 +446,6 @@ public class LoadAwareShuffleGroupingTest {
         // Call prepare with our available taskIds
         grouper.prepare(context, null, availableTaskIds);
 
-        // triggers building ring
-        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
-
         long current = System.currentTimeMillis();
         int idx = 0;
         while (true) {