You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 18:09:25 UTC
[7/8] storm git commit: STORM-2678 Improve performance of
LoadAwareShuffleGrouping
STORM-2678 Improve performance of LoadAwareShuffleGrouping
* remove LoadAware specific chooseTasks method
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4dade36c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4dade36c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4dade36c
Branch: refs/heads/master
Commit: 4dade36c6200a1fea06aa207f3f6d8470b725d0f
Parents: ab38a7a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Aug 24 09:15:27 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Aug 24 09:15:27 2017 +0900
----------------------------------------------------------------------
.../org/apache/storm/daemon/GrouperFactory.java | 10 ----------
.../src/jvm/org/apache/storm/daemon/Task.java | 2 +-
.../grouping/LoadAwareCustomStreamGrouping.java | 3 ---
.../storm/grouping/LoadAwareShuffleGrouping.java | 15 +++++----------
.../grouping/LoadAwareShuffleGroupingTest.java | 17 ++++++++---------
.../test/clj/org/apache/storm/grouping_test.clj | 2 +-
6 files changed, 15 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
index 03e029e..179e882 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -124,11 +124,6 @@ public class GrouperFactory {
}
@Override
- public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
- return customStreamGrouping.chooseTasks(taskId, values);
- }
-
- @Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
customStreamGrouping.prepare(context, stream, targetTasks);
}
@@ -234,11 +229,6 @@ public class GrouperFactory {
}
@Override
- public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
- return null;
- }
-
- @Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index b79a259..0bc56fb 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -137,7 +137,7 @@ public class Task {
if (grouper == GrouperFactory.DIRECT) {
throw new IllegalArgumentException("Cannot do regular emit to direct stream");
}
- List<Integer> compTasks = grouper.chooseTasks(taskId, values, loadMapping);
+ List<Integer> compTasks = grouper.chooseTasks(taskId, values);
outTasks.addAll(compTasks);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
index fba7254..6e5a3fd 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
@@ -17,9 +17,6 @@
*/
package org.apache.storm.grouping;
-import java.util.List;
-
public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
void refreshLoad(LoadMapping loadMapping);
- List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 97c5ce1..0c0560a 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -74,16 +74,6 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
- throw new RuntimeException("NOT IMPLEMENTED");
- }
-
- @Override
- public void refreshLoad(LoadMapping loadMapping) {
- updateRing(loadMapping);
- }
-
- @Override
- public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
int rightNow;
while (true) {
rightNow = current.incrementAndGet();
@@ -98,6 +88,11 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
}
}
+ @Override
+ public void refreshLoad(LoadMapping loadMapping) {
+ updateRing(loadMapping);
+ }
+
private void updateRing(LoadMapping load) {
int localTotal = 0;
for (int i = 0 ; i < targets.length; i++) {
http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 2d90d66..53ac404 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -117,8 +117,7 @@ public class LoadAwareShuffleGroupingTest {
public int[] call() throws Exception {
int[] taskCounts = new int[availableTaskIds.size()];
for (int i = 1; i <= groupingExecutionsPerThread; i++) {
- List<Integer> taskIds = grouper.chooseTasks(inputTaskId,
- Lists.newArrayList(), loadMapping);
+ List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList());
// Validate a single task id return
assertNotNull("Not null taskId list returned", taskIds);
@@ -212,7 +211,7 @@ public class LoadAwareShuffleGroupingTest {
List<Object> data = Lists.newArrayList(1, 2);
int[] frequencies = new int[3];
for (int i = 0 ; i < numMessages ; i++) {
- List<Integer> tasks = shuffler.chooseTasks(1, data, load);
+ List<Integer> tasks = shuffler.chooseTasks(1, data);
for (int task : tasks) {
frequencies[task]++;
}
@@ -252,7 +251,7 @@ public class LoadAwareShuffleGroupingTest {
List<Object> data = Lists.newArrayList(1, 2);
int[] frequencies = new int[3]; // task id starts from 1
for (int i = 0 ; i < numMessages ; i++) {
- List<Integer> tasks = shuffler.chooseTasks(1, data, load);
+ List<Integer> tasks = shuffler.chooseTasks(1, data);
for (int task : tasks) {
frequencies[task]++;
}
@@ -361,7 +360,7 @@ public class LoadAwareShuffleGroupingTest {
for (int i = 1; i <= totalEmits; i++) {
List<Integer> taskIds = grouper
- .chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ .chooseTasks(inputTaskId, Lists.newArrayList());
// Validate a single task id return
assertNotNull("Not null taskId list returned", taskIds);
@@ -422,7 +421,7 @@ public class LoadAwareShuffleGroupingTest {
long current = System.currentTimeMillis();
int idx = 0;
while (true) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList());
idx++;
if (idx % 100000 == 0) {
@@ -435,7 +434,7 @@ public class LoadAwareShuffleGroupingTest {
current = System.currentTimeMillis();
for (int i = 1; i <= 2_000_000_000; i++) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList());
}
LOG.info("Duration: {} ms", (System.currentTimeMillis() - current));
@@ -462,7 +461,7 @@ public class LoadAwareShuffleGroupingTest {
long current = System.currentTimeMillis();
int idx = 0;
while (true) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList());
idx++;
if (idx % 100000 == 0) {
@@ -482,7 +481,7 @@ public class LoadAwareShuffleGroupingTest {
public Long call() throws Exception {
long current = System.currentTimeMillis();
for (int i = 1; i <= groupingExecutionsPerThread; i++) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList());
}
return System.currentTimeMillis() - current;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index 68ebb4e..c216008 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -32,7 +32,7 @@
min-prcnt (int (* num-messages 0.49))
max-prcnt (int (* num-messages 0.51))
data [1 2]
- freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data nil)))
+ freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data)))
load1 (.get freq [(int 1)])
load2 (.get freq [(int 2)])]
(log-message "FREQ:" freq)