You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/10 00:12:24 UTC
[1/2] storm git commit: STORM-2940 dynamically set the CAPACITY value
of LoadAwareShuffleGrouping
Repository: storm
Updated Branches:
refs/heads/master a715e9a5c -> 0b3203239
STORM-2940 dynamically set the CAPACITY value of LoadAwareShuffleGrouping
* lose granularity if targetTask size is larger than 1000
* add tests
This closes #2551
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b332503b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b332503b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b332503b
Branch: refs/heads/master
Commit: b332503b113fa6cde6afa320c9c190e3aad5b4c3
Parents: a715e9a
Author: Ethan Li <et...@gmail.com>
Authored: Wed Feb 7 13:40:45 2018 -0600
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Feb 10 09:11:37 2018 +0900
----------------------------------------------------------------------
.../grouping/LoadAwareShuffleGrouping.java | 35 ++++++++++++--------
.../grouping/LoadAwareShuffleGroupingTest.java | 33 +++++++++++++-----
2 files changed, 46 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b332503b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 3fd75e5..832ba7f 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
- static final int CAPACITY = 1000;
+ private int capacity;
private static final int MAX_WEIGHT = 100;
private static class IndexAndWeights {
final int index;
@@ -85,6 +85,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
sourceNodeInfo = new NodeInfo(context.getThisWorkerHost(), Sets.newHashSet((long) context.getThisWorkerPort()));
taskToNodePort = context.getTaskToNodePort();
this.targetTasks = targetTasks;
+ capacity = targetTasks.size() == 1 ? 1 : Math.max(1000, targetTasks.size() * 5);
conf = context.getConf();
dnsToSwitchMapping = ReflectionUtils.newInstance((String) conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN));
localityGroup = new HashMap<>();
@@ -101,11 +102,11 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
}
// can't leave choices to be empty, so initiate it similar as ShuffleGrouping
- choices = new int[CAPACITY];
+ choices = new int[capacity];
current = new AtomicInteger(0);
// allocate another array to be switched
- prepareChoices = new int[CAPACITY];
+ prepareChoices = new int[capacity];
updateRing(null);
}
@@ -114,9 +115,9 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
int rightNow;
while (true) {
rightNow = current.incrementAndGet();
- if (rightNow < CAPACITY) {
+ if (rightNow < capacity) {
return rets[choices[rightNow]];
- } else if (rightNow == CAPACITY) {
+ } else if (rightNow == capacity) {
current.set(0);
return rets[choices[0]];
}
@@ -163,7 +164,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
if (targetInScope.isEmpty()) {
Scope upScope = Scope.upgrade(currentScope);
if (upScope == currentScope) {
- throw new RuntimeException("This executor has no target tasks.");
+ throw new RuntimeException("The current scope " + currentScope + " has no target tasks.");
}
currentScope = upScope;
return transition(load);
@@ -222,21 +223,24 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
if (weightSum > 0) {
for (int target: targetsInScope) {
IndexAndWeights indexAndWeights = orig.get(target);
- int count = (int) ((indexAndWeights.weight / (double) weightSum) * CAPACITY);
- for (int i = 0; i < count && currentIdx < CAPACITY; i++) {
+ int count = (int) ((indexAndWeights.weight / (double) weightSum) * capacity);
+ for (int i = 0; i < count && currentIdx < capacity; i++) {
prepareChoices[currentIdx] = indexAndWeights.index;
currentIdx++;
}
}
- //in case we didn't fill in enough
- for (; currentIdx < CAPACITY; currentIdx++) {
- prepareChoices[currentIdx] = prepareChoices[random.nextInt(currentIdx)];
+ if (currentIdx > 0) {
+ //in case we didn't fill in enough
+ for (; currentIdx < capacity; currentIdx++) {
+ prepareChoices[currentIdx] = prepareChoices[random.nextInt(currentIdx)];
+ }
}
- } else {
+ }
+ if (currentIdx == 0) {
//This really should be impossible, because we go off of the min load, and inc anything within 5% of it.
// But just to be sure it is never an issue, especially with float rounding etc.
- for (;currentIdx < CAPACITY; currentIdx++) {
+ for (;currentIdx < capacity; currentIdx++) {
prepareChoices[currentIdx] = currentIdx % rets.length;
}
}
@@ -322,4 +326,9 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
}
}
}
+
+ //only for test
+ public int getCapacity() {
+ return capacity;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/b332503b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index d704900..12ab32a 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -101,10 +101,10 @@ public class LoadAwareShuffleGroupingTest {
double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight);
double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight);
assertEquals("i = " + i,
- expectedOnePercentage, countByType.getOrDefault(1, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+ expectedOnePercentage, countByType.getOrDefault(1, 0.0) / grouping.getCapacity(),
0.01);
assertEquals("i = " + i,
- expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+ expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / grouping.getCapacity(),
0.01);
}
@@ -121,9 +121,9 @@ public class LoadAwareShuffleGroupingTest {
LOG.info("contByType = {}", countByType);
double expectedOnePercentage = expectedOneWeight / (expectedOneWeight + expectedTwoWeight);
double expectedTwoPercentage = expectedTwoWeight / (expectedOneWeight + expectedTwoWeight);
- assertEquals(expectedOnePercentage, countByType.getOrDefault(1, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+ assertEquals(expectedOnePercentage, countByType.getOrDefault(1, 0.0) / grouping.getCapacity(),
0.01);
- assertEquals(expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / LoadAwareShuffleGrouping.CAPACITY,
+ assertEquals(expectedTwoPercentage, countByType.getOrDefault(2, 0.0) / grouping.getCapacity(),
0.01);
}
}
@@ -138,9 +138,16 @@ public class LoadAwareShuffleGroupingTest {
}
@Test
- public void testLoadAwareShuffleGroupingWithEvenLoad() {
- // just pick arbitrary number
- final int numTasks = 7;
+ public void testLoadAwareShuffleGroupingWithEvenLoadWithManyTargets() {
+ testLoadAwareShuffleGroupingWithEvenLoad(1000);
+ }
+
+ @Test
+ public void testLoadAwareShuffleGroupingWithEvenLoadWithLessTargets() {
+ testLoadAwareShuffleGroupingWithEvenLoad(7);
+ }
+
+ private void testLoadAwareShuffleGroupingWithEvenLoad(int numTasks) {
final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
// Define our taskIds and loads
@@ -166,8 +173,16 @@ public class LoadAwareShuffleGroupingTest {
}
@Test
- public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded() throws InterruptedException, ExecutionException {
- final int numTasks = 7;
+ public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreadedWithManyTargets() throws ExecutionException, InterruptedException {
+ testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded(1000);
+ }
+
+ @Test
+ public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreadedWithLessTargets() throws ExecutionException, InterruptedException {
+ testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded(7);
+ }
+
+ private void testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded(int numTasks) throws InterruptedException, ExecutionException {
final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
[2/2] storm git commit: Merge branch 'STORM-2940-merge'
Posted by ka...@apache.org.
Merge branch 'STORM-2940-merge'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0b320323
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0b320323
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0b320323
Branch: refs/heads/master
Commit: 0b320323936018e34c7469af98c76bea50d7dc54
Parents: a715e9a b332503
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Feb 10 09:12:18 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Feb 10 09:12:18 2018 +0900
----------------------------------------------------------------------
.../grouping/LoadAwareShuffleGrouping.java | 35 ++++++++++++--------
.../grouping/LoadAwareShuffleGroupingTest.java | 33 +++++++++++++-----
2 files changed, 46 insertions(+), 22 deletions(-)
----------------------------------------------------------------------