You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 18:09:21 UTC
[3/8] storm git commit: STORM-2678 Improve performance of
LoadAwareShuffleGrouping
STORM-2678 Improve performance of LoadAwareShuffleGrouping
* introduce 'skip checking update count'
* we no longer call System.currentTimeMillis() every time
* but we call AtomicInteger.incrementAndGet() every time
* this may hurt multi-thread perf. but really faster with single-thread
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8f63d5a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8f63d5a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8f63d5a8
Branch: refs/heads/master
Commit: 8f63d5a85609ca71019ab298859af0b9858262b8
Parents: cab86d0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Aug 4 17:48:32 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Aug 4 18:26:21 2017 +0900
----------------------------------------------------------------------
.../grouping/LoadAwareShuffleGrouping.java | 28 ++++++++---
.../grouping/LoadAwareShuffleGroupingTest.java | 52 ++++++++++++--------
2 files changed, 52 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8f63d5a8/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index b460a8a..84e4612 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -26,12 +26,16 @@ import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.WorkerTopologyContext;
public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
private static final int CAPACITY_TASK_MULTIPLICATION = 100;
+ @VisibleForTesting
+ static final int CHECK_UPDATE_INDEX = 100;
+
private Random random;
private List<Integer>[] rets;
private int[] targets;
@@ -39,8 +43,9 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
private AtomicInteger current;
private int actualCapacity = 0;
+ private AtomicInteger skipCheckingUpdateCount;
+ private AtomicBoolean isUpdating;
private long lastUpdate = 0;
- private AtomicBoolean isUpdating = new AtomicBoolean(false);
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
@@ -65,6 +70,8 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
Collections.shuffle(choices, random);
current = new AtomicInteger(0);
+ skipCheckingUpdateCount = new AtomicInteger(0);
+ isUpdating = new AtomicBoolean(false);
}
@Override
@@ -74,13 +81,18 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
- if ((lastUpdate + 1000) < System.currentTimeMillis()
- && isUpdating.compareAndSet(false, true)) {
- // update time earlier to reduce chance to do CAS
- // concurrent call can still rely on old choices
- lastUpdate = System.currentTimeMillis();
- updateRing(load);
- isUpdating.set(false);
+ if (skipCheckingUpdateCount.incrementAndGet() == CHECK_UPDATE_INDEX) {
+ skipCheckingUpdateCount.set(0);
+ if ((lastUpdate + 1000) < System.currentTimeMillis()
+ && isUpdating.compareAndSet(false, true)) {
+ // before finishing updateRing(), concurrent call will still rely on old choices
+ updateRing(load);
+
+ // update time and open a chance to update ring again
+ lastUpdate = System.currentTimeMillis();
+ skipCheckingUpdateCount.set(0);
+ isUpdating.set(false);
+ }
}
int rightNow;
http://git-wip-us.apache.org/repos/asf/storm/blob/8f63d5a8/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index bf2269f..667f36d 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -45,6 +45,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class LoadAwareShuffleGroupingTest {
+ public static final double ACCEPTABLE_MARGIN = 0.015;
private static final Logger LOG = LoggerFactory.getLogger(LoadAwareShuffleGroupingTest.class);
@Test
@@ -86,17 +87,19 @@ public class LoadAwareShuffleGroupingTest {
final LoadMapping loadMapping = buildLocalTasksEvenLoadMapping(availableTaskIds);
final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
-
- // Call prepare with our available taskIds
grouper.prepare(context, null, availableTaskIds);
- // triggers building ring
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ // force triggers building ring
+ for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ }
// calling chooseTasks should be finished before refreshing ring
// adjusting groupingExecutionsPerThread might be needed with really slow machine
// we allow race condition between refreshing ring and choosing tasks
// so it will not make exact even distribution, though diff is expected to be small
+ // given that all threadTasks are finished before refreshing ring,
+ // distribution should be exactly even
final int groupingExecutionsPerThread = numTasks * 5000;
final int numThreads = 10;
@@ -142,6 +145,19 @@ public class LoadAwareShuffleGroupingTest {
}
}
+ int[] loads = new int[numTasks];
+ int localTotal = 0;
+ List<Double> loadRate = new ArrayList<>();
+ for (int i = 0; i < numTasks; i++) {
+ int val = (int)(101 - (loadMapping.get(i) * 100));
+ loads[i] = val;
+ localTotal += val;
+ }
+
+ for (int i = 0; i < numTasks; i++) {
+ loadRate.add(loads[i] * 1.0 / localTotal);
+ }
+
for (int i = 0; i < numTasks; i++) {
int expected = numThreads * groupingExecutionsPerThread / numTasks;
assertEquals("Distribution should be even for all nodes", expected, taskIdTotals[i]);
@@ -164,7 +180,7 @@ public class LoadAwareShuffleGroupingTest {
@Test
public void testLoadAwareShuffleGroupingWithRandomTasksAndRandomLoad() {
- for (int trial = 0 ; trial < 100 ; trial++) {
+ for (int trial = 0 ; trial < 200 ; trial++) {
// just pick arbitrary number in 5 ~ 100
final int numTasks = new Random().nextInt(96) + 5;
final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
@@ -185,8 +201,8 @@ public class LoadAwareShuffleGroupingTest {
.mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
Lists.newArrayList(1, 2), Collections.emptyMap());
int numMessages = 100000;
- int minPrCount = (int) (numMessages * 0.49);
- int maxPrCount = (int) (numMessages * 0.51);
+ int minPrCount = (int) (numMessages * (0.5 - ACCEPTABLE_MARGIN));
+ int maxPrCount = (int) (numMessages * (0.5 + ACCEPTABLE_MARGIN));
LoadMapping load = new LoadMapping();
Map<Integer, Double> loadInfoMap = new HashMap<>();
loadInfoMap.put(1, 0.0);
@@ -220,10 +236,10 @@ public class LoadAwareShuffleGroupingTest {
.mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
Lists.newArrayList(1, 2), Collections.emptyMap());
int numMessages = 100000;
- int min1PrCount = (int) (numMessages * 0.32);
- int max1PrCount = (int) (numMessages * 0.34);
- int min2PrCount = (int) (numMessages * 0.65);
- int max2PrCount = (int) (numMessages * 0.67);
+ int min1PrCount = (int) (numMessages * (0.33 - ACCEPTABLE_MARGIN));
+ int max1PrCount = (int) (numMessages * (0.33 + ACCEPTABLE_MARGIN));
+ int min2PrCount = (int) (numMessages * (0.66 - ACCEPTABLE_MARGIN));
+ int max2PrCount = (int) (numMessages * (0.66 + ACCEPTABLE_MARGIN));
LoadMapping load = new LoadMapping();
Map<Integer, Double> loadInfoMap = new HashMap<>();
loadInfoMap.put(1, 0.5);
@@ -337,8 +353,10 @@ public class LoadAwareShuffleGroupingTest {
// Task Id not used, so just pick a static value
int inputTaskId = 100;
- // triggers building ring
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ // force triggers building ring
+ for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ }
for (int i = 1; i <= totalEmits; i++) {
List<Integer> taskIds = grouper
@@ -379,7 +397,7 @@ public class LoadAwareShuffleGroupingTest {
int totalEmits = 5000 * numTasks;
int[] taskCounts = runChooseTasksWithVerification(grouper, totalEmits, numTasks, loadMapping);
- int delta = (int) (totalEmits * 0.01);
+ int delta = (int) (totalEmits * ACCEPTABLE_MARGIN);
for (int i = 0; i < numTasks; i++) {
int expected = (int) (totalEmits * loadRate.get(i));
assertTrue("Distribution should respect the task load with small delta",
@@ -395,9 +413,6 @@ public class LoadAwareShuffleGroupingTest {
WorkerTopologyContext context = mock(WorkerTopologyContext.class);
grouper.prepare(context, null, availableTaskIds);
- // triggers building distribution ring
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
-
long current = System.currentTimeMillis();
int idx = 0;
while (true) {
@@ -431,9 +446,6 @@ public class LoadAwareShuffleGroupingTest {
// Call prepare with our available taskIds
grouper.prepare(context, null, availableTaskIds);
- // triggers building ring
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
-
long current = System.currentTimeMillis();
int idx = 0;
while (true) {