You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 18:09:25 UTC

[7/8] storm git commit: STORM-2678 Improve performance of LoadAwareShuffleGrouping

STORM-2678 Improve performance of LoadAwareShuffleGrouping

* remove LoadAware specific chooseTasks method


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4dade36c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4dade36c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4dade36c

Branch: refs/heads/master
Commit: 4dade36c6200a1fea06aa207f3f6d8470b725d0f
Parents: ab38a7a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Aug 24 09:15:27 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Aug 24 09:15:27 2017 +0900

----------------------------------------------------------------------
 .../org/apache/storm/daemon/GrouperFactory.java    | 10 ----------
 .../src/jvm/org/apache/storm/daemon/Task.java      |  2 +-
 .../grouping/LoadAwareCustomStreamGrouping.java    |  3 ---
 .../storm/grouping/LoadAwareShuffleGrouping.java   | 15 +++++----------
 .../grouping/LoadAwareShuffleGroupingTest.java     | 17 ++++++++---------
 .../test/clj/org/apache/storm/grouping_test.clj    |  2 +-
 6 files changed, 15 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
index 03e029e..179e882 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -124,11 +124,6 @@ public class GrouperFactory {
         }
 
         @Override
-        public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
-            return customStreamGrouping.chooseTasks(taskId, values);
-        }
-
-        @Override
         public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
             customStreamGrouping.prepare(context, stream, targetTasks);
         }
@@ -234,11 +229,6 @@ public class GrouperFactory {
         }
 
         @Override
-        public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
-            return null;
-        }
-
-        @Override
         public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
 
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index b79a259..0bc56fb 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -137,7 +137,7 @@ public class Task {
                 if (grouper == GrouperFactory.DIRECT) {
                     throw new IllegalArgumentException("Cannot do regular emit to direct stream");
                 }
-                List<Integer> compTasks = grouper.chooseTasks(taskId, values, loadMapping);
+                List<Integer> compTasks = grouper.chooseTasks(taskId, values);
                 outTasks.addAll(compTasks);
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
index fba7254..6e5a3fd 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
@@ -17,9 +17,6 @@
  */
 package org.apache.storm.grouping;
 
-import java.util.List;
-
 public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
    void refreshLoad(LoadMapping loadMapping);
-   List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 97c5ce1..0c0560a 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -74,16 +74,6 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
 
     @Override
     public List<Integer> chooseTasks(int taskId, List<Object> values) {
-        throw new RuntimeException("NOT IMPLEMENTED");
-    }
-
-    @Override
-    public void refreshLoad(LoadMapping loadMapping) {
-        updateRing(loadMapping);
-    }
-
-    @Override
-    public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
         int rightNow;
         while (true) {
             rightNow = current.incrementAndGet();
@@ -98,6 +88,11 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         }
     }
 
+    @Override
+    public void refreshLoad(LoadMapping loadMapping) {
+        updateRing(loadMapping);
+    }
+
     private void updateRing(LoadMapping load) {
         int localTotal = 0;
         for (int i = 0 ; i < targets.length; i++) {

http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 2d90d66..53ac404 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -117,8 +117,7 @@ public class LoadAwareShuffleGroupingTest {
                 public int[] call() throws Exception {
                     int[] taskCounts = new int[availableTaskIds.size()];
                     for (int i = 1; i <= groupingExecutionsPerThread; i++) {
-                        List<Integer> taskIds = grouper.chooseTasks(inputTaskId,
-                            Lists.newArrayList(), loadMapping);
+                        List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList());
 
                         // Validate a single task id return
                         assertNotNull("Not null taskId list returned", taskIds);
@@ -212,7 +211,7 @@ public class LoadAwareShuffleGroupingTest {
         List<Object> data = Lists.newArrayList(1, 2);
         int[] frequencies = new int[3];
         for (int i = 0 ; i < numMessages ; i++) {
-            List<Integer> tasks = shuffler.chooseTasks(1, data, load);
+            List<Integer> tasks = shuffler.chooseTasks(1, data);
             for (int task : tasks) {
                 frequencies[task]++;
             }
@@ -252,7 +251,7 @@ public class LoadAwareShuffleGroupingTest {
         List<Object> data = Lists.newArrayList(1, 2);
         int[] frequencies = new int[3]; // task id starts from 1
         for (int i = 0 ; i < numMessages ; i++) {
-            List<Integer> tasks = shuffler.chooseTasks(1, data, load);
+            List<Integer> tasks = shuffler.chooseTasks(1, data);
             for (int task : tasks) {
                 frequencies[task]++;
             }
@@ -361,7 +360,7 @@ public class LoadAwareShuffleGroupingTest {
 
         for (int i = 1; i <= totalEmits; i++) {
             List<Integer> taskIds = grouper
-                .chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+                .chooseTasks(inputTaskId, Lists.newArrayList());
 
             // Validate a single task id return
             assertNotNull("Not null taskId list returned", taskIds);
@@ -422,7 +421,7 @@ public class LoadAwareShuffleGroupingTest {
         long current = System.currentTimeMillis();
         int idx = 0;
         while (true) {
-            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList());
 
             idx++;
             if (idx % 100000 == 0) {
@@ -435,7 +434,7 @@ public class LoadAwareShuffleGroupingTest {
 
         current = System.currentTimeMillis();
         for (int i = 1; i <= 2_000_000_000; i++) {
-            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList());
         }
 
         LOG.info("Duration: {} ms", (System.currentTimeMillis() - current));
@@ -462,7 +461,7 @@ public class LoadAwareShuffleGroupingTest {
         long current = System.currentTimeMillis();
         int idx = 0;
         while (true) {
-            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList());
 
             idx++;
             if (idx % 100000 == 0) {
@@ -482,7 +481,7 @@ public class LoadAwareShuffleGroupingTest {
                 public Long call() throws Exception {
                     long current = System.currentTimeMillis();
                     for (int i = 1; i <= groupingExecutionsPerThread; i++) {
-                        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+                        grouper.chooseTasks(inputTaskId, Lists.newArrayList());
                     }
                     return System.currentTimeMillis() - current;
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index 68ebb4e..c216008 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -32,7 +32,7 @@
        min-prcnt (int (* num-messages 0.49))
        max-prcnt (int (* num-messages 0.51))
        data [1 2]
-       freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data nil)))
+       freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data)))
        load1 (.get freq [(int 1)])
        load2 (.get freq [(int 2)])]
     (log-message "FREQ:" freq)