You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 18:09:19 UTC
[1/8] storm git commit: STORM-2678 Improve performance of
LoadAwareShuffleGrouping
Repository: storm
Updated Branches:
refs/heads/master 9bf202277 -> dad14e414
STORM-2678 Improve performance of LoadAwareShuffleGrouping
* construct ring which represents distribution of tasks based on load
* chooseTasks() just accesses the ring sequentially
* port related tests from Clojure to Java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/48427880
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/48427880
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/48427880
Branch: refs/heads/master
Commit: 48427880a483e1d86f9dbe99ef46e395e3a40308
Parents: 77354fe
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Aug 4 13:25:27 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Aug 4 15:17:24 2017 +0900
----------------------------------------------------------------------
.../grouping/LoadAwareShuffleGrouping.java | 104 ++++-
.../grouping/LoadAwareShuffleGroupingTest.java | 399 +++++++++++++++++++
.../test/clj/org/apache/storm/grouping_test.clj | 36 --
3 files changed, 485 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/48427880/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 9a07194..b460a8a 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -20,30 +20,51 @@ package org.apache.storm.grouping;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.WorkerTopologyContext;
public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
+ private static final int CAPACITY_TASK_MULTIPLICATION = 100;
+
private Random random;
private List<Integer>[] rets;
private int[] targets;
- private int[] loads;
- private int total;
+ private ArrayList<List<Integer>> choices;
+ private AtomicInteger current;
+ private int actualCapacity = 0;
+
private long lastUpdate = 0;
+ private AtomicBoolean isUpdating = new AtomicBoolean(false);
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
random = new Random();
+
rets = (List<Integer>[])new List<?>[targetTasks.size()];
targets = new int[targetTasks.size()];
for (int i = 0; i < targets.length; i++) {
rets[i] = Arrays.asList(targetTasks.get(i));
targets[i] = targetTasks.get(i);
}
- loads = new int[targets.length];
+
+ actualCapacity = targets.length * CAPACITY_TASK_MULTIPLICATION;
+
+ // can't leave choices to be empty, so initiate it similar as ShuffleGrouping
+ choices = new ArrayList<>(actualCapacity);
+ int index = 0;
+ while (index < actualCapacity) {
+ choices.add(rets[index % targets.length]);
+ index++;
+ }
+
+ Collections.shuffle(choices, random);
+ current = new AtomicInteger(0);
}
@Override
@@ -53,24 +74,71 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
- if ((lastUpdate + 1000) < System.currentTimeMillis()) {
- int local_total = 0;
- for (int i = 0; i < targets.length; i++) {
- int val = (int)(101 - (load.get(targets[i]) * 100));
- loads[i] = val;
- local_total += val;
- }
- total = local_total;
+ if ((lastUpdate + 1000) < System.currentTimeMillis()
+ && isUpdating.compareAndSet(false, true)) {
+ // update time earlier to reduce chance to do CAS
+ // concurrent call can still rely on old choices
lastUpdate = System.currentTimeMillis();
+ updateRing(load);
+ isUpdating.set(false);
}
- int selected = random.nextInt(total);
- int sum = 0;
- for (int i = 0; i < targets.length; i++) {
- sum += loads[i];
- if (selected < sum) {
- return rets[i];
+
+ int rightNow;
+ int size = choices.size();
+ while (true) {
+ rightNow = current.incrementAndGet();
+ if (rightNow < size) {
+ return choices.get(rightNow);
+ } else if (rightNow == size) {
+ current.set(0);
+ return choices.get(0);
}
+ //race condition with another thread, and we lost
+ // try again
}
- return rets[rets.length-1];
+ }
+
+ private void updateRing(LoadMapping load) {
+ int localTotal = 0;
+ int[] loads = new int[targets.length];
+ for (int i = 0 ; i < targets.length; i++) {
+ int val = (int)(101 - (load.get(targets[i]) * 100));
+ loads[i] = val;
+ localTotal += val;
+ }
+
+ // allocating enough memory doesn't hurt much, so assign aggressively
+ // we will cut out if actual size becomes bigger than actualCapacity
+ ArrayList<List<Integer>> newChoices = new ArrayList<>(actualCapacity + targets.length);
+ for (int i = 0 ; i < loads.length ; i++) {
+ int loadForTask = loads[i];
+ int amount = loadForTask * actualCapacity / localTotal;
+ // assign at least one for task
+ if (amount == 0) {
+ amount = 1;
+ }
+ for (int j = 0; j < amount; j++) {
+ newChoices.add(rets[i]);
+ }
+ }
+
+ Collections.shuffle(newChoices, random);
+
+ // make sure length of newChoices is same as actualCapacity, like current choices
+ // this ensures safety when requests and update occurs concurrently
+ if (newChoices.size() > actualCapacity) {
+ newChoices.subList(actualCapacity, newChoices.size()).clear();
+ } else if (newChoices.size() < actualCapacity) {
+ int remaining = actualCapacity - newChoices.size();
+ while (remaining > 0) {
+ newChoices.add(newChoices.get(remaining % newChoices.size()));
+ remaining--;
+ }
+ }
+
+ assert newChoices.size() == actualCapacity;
+
+ choices = newChoices;
+ current.set(0);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/48427880/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
new file mode 100644
index 0000000..410c1f9
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.grouping;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.NullStruct;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class LoadAwareShuffleGroupingTest {
+ private static final Logger LOG = LoggerFactory.getLogger(LoadAwareShuffleGroupingTest.class);
+
+ @Test
+ public void testLoadAwareShuffleGroupingWithEvenLoad() {
+ // just pick arbitrary number
+ final int numTasks = 7;
+ final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
+
+ // Define our taskIds and loads
+ final List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ final LoadMapping loadMapping = buildLocalTasksEvenLoadMapping(availableTaskIds);
+
+ WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+ grouper.prepare(context, null, availableTaskIds);
+
+ // Keep track of how many times we see each taskId
+ // distribution should be even for all nodes when loads are even
+ int desiredTaskCountPerTask = 5000;
+ int totalEmits = numTasks * desiredTaskCountPerTask;
+
+ int[] taskCounts = runChooseTasksWithVerification(grouper, totalEmits, numTasks, loadMapping);
+
+ for (int i = 0; i < numTasks; i++) {
+ assertEquals("Distribution should be even for all nodes", desiredTaskCountPerTask,
+ taskCounts[i]);
+ }
+ }
+
+ @Test
+ public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded() throws InterruptedException, ExecutionException {
+ final int numTasks = 7;
+
+ final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
+
+ // Task Id not used, so just pick a static value
+ final int inputTaskId = 100;
+ // Define our taskIds - the test expects these to be incrementing by one up from zero
+ final List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ final LoadMapping loadMapping = buildLocalTasksEvenLoadMapping(availableTaskIds);
+
+ final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+
+ // Call prepare with our available taskIds
+ grouper.prepare(context, null, availableTaskIds);
+
+ // triggers building ring
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+
+ // calling chooseTasks should be finished before refreshing ring
+ // adjusting groupingExecutionsPerThread might be needed with really slow machine
+ // we allow race condition between refreshing ring and choosing tasks
+ // so it will not make exact even distribution, though diff is expected to be small
+ final int groupingExecutionsPerThread = numTasks * 5000;
+ final int numThreads = 10;
+
+ List<Callable<int[]>> threadTasks = Lists.newArrayList();
+ for (int x = 0; x < numThreads; x++) {
+ Callable<int[]> threadTask = new Callable<int[]>() {
+ @Override
+ public int[] call() throws Exception {
+ int[] taskCounts = new int[availableTaskIds.size()];
+ for (int i = 1; i <= groupingExecutionsPerThread; i++) {
+ List<Integer> taskIds = grouper.chooseTasks(inputTaskId,
+ Lists.newArrayList(), loadMapping);
+
+ // Validate a single task id return
+ assertNotNull("Not null taskId list returned", taskIds);
+ assertEquals("Single task Id returned", 1, taskIds.size());
+
+ int taskId = taskIds.get(0);
+
+ assertTrue("TaskId should exist", taskId >= 0 && taskId < availableTaskIds.size());
+ taskCounts[taskId]++;
+ }
+ return taskCounts;
+ }
+ };
+
+ // Add to our collection.
+ threadTasks.add(threadTask);
+ }
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadTasks.size());
+ List<Future<int[]>> taskResults = executor.invokeAll(threadTasks);
+
+ // Wait for all tasks to complete
+ int[] taskIdTotals = new int[numTasks];
+ for (Future taskResult: taskResults) {
+ while (!taskResult.isDone()) {
+ Thread.sleep(1000);
+ }
+ int[] taskDistributions = (int[]) taskResult.get();
+ for (int i = 0; i < taskDistributions.length; i++) {
+ taskIdTotals[i] += taskDistributions[i];
+ }
+ }
+
+ for (int i = 0; i < numTasks; i++) {
+ int expected = numThreads * groupingExecutionsPerThread / numTasks;
+ assertEquals("Distribution should be even for all nodes", expected, taskIdTotals[i]);
+ }
+ }
+
+ @Test
+ public void testLoadAwareShuffleGroupingWithUnevenLoad() {
+ // just pick arbitrary number
+ final int numTasks = 7;
+ final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
+
+ // Define our taskIds and loads
+ final List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ final LoadMapping loadMapping = buildLocalTasksUnevenLoadMapping(availableTaskIds);
+
+ runDistributionVerificationTestWithUnevenLoad(numTasks, grouper, availableTaskIds,
+ loadMapping);
+ }
+
+ @Test
+ public void testLoadAwareShuffleGroupingWithRandomTasksAndRandomLoad() {
+ for (int trial = 0 ; trial < 100 ; trial++) {
+ // just pick arbitrary number in 5 ~ 100
+ final int numTasks = new Random().nextInt(96) + 5;
+ final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
+
+ // Define our taskIds and loads
+ final List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ final LoadMapping loadMapping = buildLocalTasksRandomLoadMapping(availableTaskIds);
+
+ runDistributionVerificationTestWithUnevenLoad(numTasks, grouper, availableTaskIds,
+ loadMapping);
+ }
+ }
+
+ @Test
+ public void testShuffleLoadEven() {
+ // port test-shuffle-load-even
+ LoadAwareCustomStreamGrouping shuffler = GrouperFactory
+ .mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
+ Lists.newArrayList(1, 2), Collections.emptyMap());
+ int numMessages = 100000;
+ int minPrCount = (int) (numMessages * 0.49);
+ int maxPrCount = (int) (numMessages * 0.51);
+ LoadMapping load = new LoadMapping();
+ Map<Integer, Double> loadInfoMap = new HashMap<>();
+ loadInfoMap.put(1, 0.0);
+ loadInfoMap.put(2, 0.0);
+ load.setLocal(loadInfoMap);
+
+ List<Object> data = Lists.newArrayList(1, 2);
+ int[] frequencies = new int[3];
+ for (int i = 0 ; i < numMessages ; i++) {
+ List<Integer> tasks = shuffler.chooseTasks(1, data, load);
+ for (int task : tasks) {
+ frequencies[task]++;
+ }
+ }
+
+ int load1 = frequencies[1];
+ int load2 = frequencies[2];
+
+ LOG.info("Frequency info: load1 = {}, load2 = {}", load1, load2);
+
+ assertTrue(load1 >= minPrCount);
+ assertTrue(load1 <= maxPrCount);
+ assertTrue(load2 >= minPrCount);
+ assertTrue(load2 <= maxPrCount);
+ }
+
+ @Test
+ public void testShuffleLoadUneven() {
+ // port test-shuffle-load-uneven
+ LoadAwareCustomStreamGrouping shuffler = GrouperFactory
+ .mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
+ Lists.newArrayList(1, 2), Collections.emptyMap());
+ int numMessages = 100000;
+ int min1PrCount = (int) (numMessages * 0.32);
+ int max1PrCount = (int) (numMessages * 0.34);
+ int min2PrCount = (int) (numMessages * 0.65);
+ int max2PrCount = (int) (numMessages * 0.67);
+ LoadMapping load = new LoadMapping();
+ Map<Integer, Double> loadInfoMap = new HashMap<>();
+ loadInfoMap.put(1, 0.5);
+ loadInfoMap.put(2, 0.0);
+ load.setLocal(loadInfoMap);
+
+ List<Object> data = Lists.newArrayList(1, 2);
+ int[] frequencies = new int[3]; // task id starts from 1
+ for (int i = 0 ; i < numMessages ; i++) {
+ List<Integer> tasks = shuffler.chooseTasks(1, data, load);
+ for (int task : tasks) {
+ frequencies[task]++;
+ }
+ }
+
+ int load1 = frequencies[1];
+ int load2 = frequencies[2];
+
+ LOG.info("Frequency info: load1 = {}, load2 = {}", load1, load2);
+
+ assertTrue(load1 >= min1PrCount);
+ assertTrue(load1 <= max1PrCount);
+ assertTrue(load2 >= min2PrCount);
+ assertTrue(load2 <= max2PrCount);
+ }
+
+ private int[] runChooseTasksWithVerification(LoadAwareShuffleGrouping grouper, int totalEmits,
+ int numTasks, LoadMapping loadMapping) {
+ int[] taskCounts = new int[numTasks];
+
+ // Task Id not used, so just pick a static value
+ int inputTaskId = 100;
+
+ // triggers building ring
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+
+ for (int i = 1; i <= totalEmits; i++) {
+ List<Integer> taskIds = grouper
+ .chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+
+ // Validate a single task id return
+ assertNotNull("Not null taskId list returned", taskIds);
+ assertEquals("Single task Id returned", 1, taskIds.size());
+
+ int taskId = taskIds.get(0);
+
+ assertTrue("TaskId should exist", taskId >= 0 && taskId < numTasks);
+ taskCounts[taskId]++;
+ }
+ return taskCounts;
+ }
+
+ private void runDistributionVerificationTestWithUnevenLoad(int numTasks,
+ LoadAwareShuffleGrouping grouper, List<Integer> availableTaskIds,
+ LoadMapping loadMapping) {
+ int[] loads = new int[numTasks];
+ int localTotal = 0;
+ List<Double> loadRate = new ArrayList<>();
+ for (int i = 0; i < numTasks; i++) {
+ int val = (int)(101 - (loadMapping.get(i) * 100));
+ loads[i] = val;
+ localTotal += val;
+ }
+
+ for (int i = 0; i < numTasks; i++) {
+ loadRate.add(loads[i] * 1.0 / localTotal);
+ }
+
+ WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+ grouper.prepare(context, null, availableTaskIds);
+
+ // Keep track of how many times we see each taskId
+ int totalEmits = 5000 * numTasks;
+ int[] taskCounts = runChooseTasksWithVerification(grouper, totalEmits, numTasks, loadMapping);
+
+ int delta = (int) (totalEmits * 0.01);
+ for (int i = 0; i < numTasks; i++) {
+ int expected = (int) (totalEmits * loadRate.get(i));
+ assertTrue("Distribution should respect the task load with small delta",
+ taskCounts[i] >= expected - delta && taskCounts[i] <= expected + delta);
+ }
+ }
+
+ @Ignore
+ @Test
+ public void testBenchmarkLoadAwareShuffleGroupingEvenLoad() {
+ final int numTasks = 10;
+ List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+ buildLocalTasksEvenLoadMapping(availableTaskIds));
+ }
+
+ @Ignore
+ @Test
+ public void testBenchmarkLoadAwareShuffleGroupingUnevenLoad() {
+ final int numTasks = 10;
+ List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+ buildLocalTasksUnevenLoadMapping(availableTaskIds));
+ }
+
+ private List<Integer> getAvailableTaskIds(int numTasks) {
+ // this method should return sequential numbers starting at 0
+ final List<Integer> availableTaskIds = Lists.newArrayList();
+ for (int i = 0; i < numTasks; i++) {
+ availableTaskIds.add(i);
+ }
+ return availableTaskIds;
+ }
+
+ private LoadMapping buildLocalTasksEvenLoadMapping(List<Integer> availableTasks) {
+ LoadMapping loadMapping = new LoadMapping();
+ Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+ for (int i = 0; i < availableTasks.size(); i++) {
+ localLoadMap.put(availableTasks.get(i), 0.1);
+ }
+ loadMapping.setLocal(localLoadMap);
+ return loadMapping;
+ }
+
+ private LoadMapping buildLocalTasksUnevenLoadMapping(List<Integer> availableTasks) {
+ LoadMapping loadMapping = new LoadMapping();
+ Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+ for (int i = 0; i < availableTasks.size(); i++) {
+ localLoadMap.put(availableTasks.get(i), 0.1 * (i + 1));
+ }
+ loadMapping.setLocal(localLoadMap);
+ return loadMapping;
+ }
+
+ private LoadMapping buildLocalTasksRandomLoadMapping(List<Integer> availableTasks) {
+ LoadMapping loadMapping = new LoadMapping();
+ Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+ for (int i = 0; i < availableTasks.size(); i++) {
+ localLoadMap.put(availableTasks.get(i), Math.random());
+ }
+ loadMapping.setLocal(localLoadMap);
+ return loadMapping;
+ }
+
+ private void runSimpleBenchmark(LoadAwareCustomStreamGrouping grouper,
+ List<Integer> availableTaskIds, LoadMapping loadMapping) {
+ // Task Id not used, so just pick a static value
+ final int inputTaskId = 100;
+
+ WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+ grouper.prepare(context, null, availableTaskIds);
+
+ // triggers building distribution ring
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+
+ long current = System.currentTimeMillis();
+ int idx = 0;
+ while (true) {
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+
+ idx++;
+ if (idx % 100000 == 0) {
+ // warm up 60 seconds
+ if (System.currentTimeMillis() - current >= 60_000) {
+ break;
+ }
+ }
+ }
+
+ current = System.currentTimeMillis();
+ for (int i = 1; i <= 2_000_000_000 ; i++) {
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ }
+
+ System.out.println("Duration: " + (System.currentTimeMillis() - current) + " ms");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/48427880/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index 9bfb8a7..68ebb4e 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -41,42 +41,6 @@
(is (>= load2 min-prcnt))
(is (<= load2 max-prcnt))))
-(deftest test-shuffle-load-even
- (let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {})
- num-messages 100000
- min-prcnt (int (* num-messages 0.49))
- max-prcnt (int (* num-messages 0.51))
- load (LoadMapping.)
- _ (.setLocal load {(int 1) 0.0 (int 2) 0.0})
- data [1 2]
- freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data load)))
- load1 (.get freq [(int 1)])
- load2 (.get freq [(int 2)])]
- (log-message "FREQ:" freq)
- (is (>= load1 min-prcnt))
- (is (<= load1 max-prcnt))
- (is (>= load2 min-prcnt))
- (is (<= load2 max-prcnt))))
-
-(deftest test-shuffle-load-uneven
- (let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {})
- num-messages 100000
- min1-prcnt (int (* num-messages 0.32))
- max1-prcnt (int (* num-messages 0.34))
- min2-prcnt (int (* num-messages 0.65))
- max2-prcnt (int (* num-messages 0.67))
- load (LoadMapping.)
- _ (.setLocal load {(int 1) 0.5 (int 2) 0.0})
- data [1 2]
- freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data load)))
- load1 (.get freq [(int 1)])
- load2 (.get freq [(int 2)])]
- (log-message "FREQ:" freq)
- (is (>= load1 min1-prcnt))
- (is (<= load1 max1-prcnt))
- (is (>= load2 min2-prcnt))
- (is (<= load2 max2-prcnt))))
-
(deftest test-field
(with-open [cluster (.build (doto (LocalCluster$Builder.)
(.withSimulatedTime)
[8/8] storm git commit: Merge branch 'STORM-2678' of
https://github.com/HeartSaVioR/storm into STORM-2678
Posted by bo...@apache.org.
Merge branch 'STORM-2678' of https://github.com/HeartSaVioR/storm into STORM-2678
STORM-2678: Improve performance of LoadAwareShuffleGrouping
This closes #2261
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dad14e41
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dad14e41
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dad14e41
Branch: refs/heads/master
Commit: dad14e414619b1f336d18bc570ea165e072008ea
Parents: 9bf2022 4dade36
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Thu Sep 7 12:32:26 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Thu Sep 7 12:32:26 2017 -0500
----------------------------------------------------------------------
.../org/apache/storm/daemon/GrouperFactory.java | 8 +-
.../src/jvm/org/apache/storm/daemon/Task.java | 2 +-
.../org/apache/storm/daemon/worker/Worker.java | 11 +-
.../jvm/org/apache/storm/executor/Executor.java | 19 +
.../apache/storm/executor/ExecutorShutdown.java | 6 +
.../apache/storm/executor/IRunningExecutor.java | 2 +
.../grouping/LoadAwareCustomStreamGrouping.java | 4 +-
.../grouping/LoadAwareShuffleGrouping.java | 126 ++++-
.../grouping/LoadAwareShuffleGroupingTest.java | 513 +++++++++++++++++++
.../test/clj/org/apache/storm/grouping_test.clj | 38 +-
10 files changed, 664 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dad14e41/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dad14e41/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
[3/8] storm git commit: STORM-2678 Improve performance of
LoadAwareShuffleGrouping
Posted by bo...@apache.org.
STORM-2678 Improve performance of LoadAwareShuffleGrouping
* introduce 'skip checking update count'
* we no longer call System.currentTimeMillis() every time
* but we call AtomicInteger.incrementAndGet() every time
* this may hurt multi-thread perf. but really faster with single-thread
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8f63d5a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8f63d5a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8f63d5a8
Branch: refs/heads/master
Commit: 8f63d5a85609ca71019ab298859af0b9858262b8
Parents: cab86d0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Aug 4 17:48:32 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Aug 4 18:26:21 2017 +0900
----------------------------------------------------------------------
.../grouping/LoadAwareShuffleGrouping.java | 28 ++++++++---
.../grouping/LoadAwareShuffleGroupingTest.java | 52 ++++++++++++--------
2 files changed, 52 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8f63d5a8/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index b460a8a..84e4612 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -26,12 +26,16 @@ import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.WorkerTopologyContext;
public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
private static final int CAPACITY_TASK_MULTIPLICATION = 100;
+ @VisibleForTesting
+ static final int CHECK_UPDATE_INDEX = 100;
+
private Random random;
private List<Integer>[] rets;
private int[] targets;
@@ -39,8 +43,9 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
private AtomicInteger current;
private int actualCapacity = 0;
+ private AtomicInteger skipCheckingUpdateCount;
+ private AtomicBoolean isUpdating;
private long lastUpdate = 0;
- private AtomicBoolean isUpdating = new AtomicBoolean(false);
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
@@ -65,6 +70,8 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
Collections.shuffle(choices, random);
current = new AtomicInteger(0);
+ skipCheckingUpdateCount = new AtomicInteger(0);
+ isUpdating = new AtomicBoolean(false);
}
@Override
@@ -74,13 +81,18 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
- if ((lastUpdate + 1000) < System.currentTimeMillis()
- && isUpdating.compareAndSet(false, true)) {
- // update time earlier to reduce chance to do CAS
- // concurrent call can still rely on old choices
- lastUpdate = System.currentTimeMillis();
- updateRing(load);
- isUpdating.set(false);
+ if (skipCheckingUpdateCount.incrementAndGet() == CHECK_UPDATE_INDEX) {
+ skipCheckingUpdateCount.set(0);
+ if ((lastUpdate + 1000) < System.currentTimeMillis()
+ && isUpdating.compareAndSet(false, true)) {
+ // before finishing updateRing(), concurrent call will still rely on old choices
+ updateRing(load);
+
+ // update time and open a chance to update ring again
+ lastUpdate = System.currentTimeMillis();
+ skipCheckingUpdateCount.set(0);
+ isUpdating.set(false);
+ }
}
int rightNow;
http://git-wip-us.apache.org/repos/asf/storm/blob/8f63d5a8/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index bf2269f..667f36d 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -45,6 +45,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class LoadAwareShuffleGroupingTest {
+ public static final double ACCEPTABLE_MARGIN = 0.015;
private static final Logger LOG = LoggerFactory.getLogger(LoadAwareShuffleGroupingTest.class);
@Test
@@ -86,17 +87,19 @@ public class LoadAwareShuffleGroupingTest {
final LoadMapping loadMapping = buildLocalTasksEvenLoadMapping(availableTaskIds);
final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
-
- // Call prepare with our available taskIds
grouper.prepare(context, null, availableTaskIds);
- // triggers building ring
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ // force triggers building ring
+ for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ }
// calling chooseTasks should be finished before refreshing ring
// adjusting groupingExecutionsPerThread might be needed with really slow machine
// we allow race condition between refreshing ring and choosing tasks
// so it will not make exact even distribution, though diff is expected to be small
+ // given that all threadTasks are finished before refreshing ring,
+ // distribution should be exactly even
final int groupingExecutionsPerThread = numTasks * 5000;
final int numThreads = 10;
@@ -142,6 +145,19 @@ public class LoadAwareShuffleGroupingTest {
}
}
+ int[] loads = new int[numTasks];
+ int localTotal = 0;
+ List<Double> loadRate = new ArrayList<>();
+ for (int i = 0; i < numTasks; i++) {
+ int val = (int)(101 - (loadMapping.get(i) * 100));
+ loads[i] = val;
+ localTotal += val;
+ }
+
+ for (int i = 0; i < numTasks; i++) {
+ loadRate.add(loads[i] * 1.0 / localTotal);
+ }
+
for (int i = 0; i < numTasks; i++) {
int expected = numThreads * groupingExecutionsPerThread / numTasks;
assertEquals("Distribution should be even for all nodes", expected, taskIdTotals[i]);
@@ -164,7 +180,7 @@ public class LoadAwareShuffleGroupingTest {
@Test
public void testLoadAwareShuffleGroupingWithRandomTasksAndRandomLoad() {
- for (int trial = 0 ; trial < 100 ; trial++) {
+ for (int trial = 0 ; trial < 200 ; trial++) {
// just pick arbitrary number in 5 ~ 100
final int numTasks = new Random().nextInt(96) + 5;
final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
@@ -185,8 +201,8 @@ public class LoadAwareShuffleGroupingTest {
.mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
Lists.newArrayList(1, 2), Collections.emptyMap());
int numMessages = 100000;
- int minPrCount = (int) (numMessages * 0.49);
- int maxPrCount = (int) (numMessages * 0.51);
+ int minPrCount = (int) (numMessages * (0.5 - ACCEPTABLE_MARGIN));
+ int maxPrCount = (int) (numMessages * (0.5 + ACCEPTABLE_MARGIN));
LoadMapping load = new LoadMapping();
Map<Integer, Double> loadInfoMap = new HashMap<>();
loadInfoMap.put(1, 0.0);
@@ -220,10 +236,10 @@ public class LoadAwareShuffleGroupingTest {
.mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
Lists.newArrayList(1, 2), Collections.emptyMap());
int numMessages = 100000;
- int min1PrCount = (int) (numMessages * 0.32);
- int max1PrCount = (int) (numMessages * 0.34);
- int min2PrCount = (int) (numMessages * 0.65);
- int max2PrCount = (int) (numMessages * 0.67);
+ int min1PrCount = (int) (numMessages * (0.33 - ACCEPTABLE_MARGIN));
+ int max1PrCount = (int) (numMessages * (0.33 + ACCEPTABLE_MARGIN));
+ int min2PrCount = (int) (numMessages * (0.66 - ACCEPTABLE_MARGIN));
+ int max2PrCount = (int) (numMessages * (0.66 + ACCEPTABLE_MARGIN));
LoadMapping load = new LoadMapping();
Map<Integer, Double> loadInfoMap = new HashMap<>();
loadInfoMap.put(1, 0.5);
@@ -337,8 +353,10 @@ public class LoadAwareShuffleGroupingTest {
// Task Id not used, so just pick a static value
int inputTaskId = 100;
- // triggers building ring
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ // force triggers building ring
+ for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ }
for (int i = 1; i <= totalEmits; i++) {
List<Integer> taskIds = grouper
@@ -379,7 +397,7 @@ public class LoadAwareShuffleGroupingTest {
int totalEmits = 5000 * numTasks;
int[] taskCounts = runChooseTasksWithVerification(grouper, totalEmits, numTasks, loadMapping);
- int delta = (int) (totalEmits * 0.01);
+ int delta = (int) (totalEmits * ACCEPTABLE_MARGIN);
for (int i = 0; i < numTasks; i++) {
int expected = (int) (totalEmits * loadRate.get(i));
assertTrue("Distribution should respect the task load with small delta",
@@ -395,9 +413,6 @@ public class LoadAwareShuffleGroupingTest {
WorkerTopologyContext context = mock(WorkerTopologyContext.class);
grouper.prepare(context, null, availableTaskIds);
- // triggers building distribution ring
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
-
long current = System.currentTimeMillis();
int idx = 0;
while (true) {
@@ -431,9 +446,6 @@ public class LoadAwareShuffleGroupingTest {
// Call prepare with our available taskIds
grouper.prepare(context, null, availableTaskIds);
- // triggers building ring
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
-
long current = System.currentTimeMillis();
int idx = 0;
while (true) {
[2/8] storm git commit: STORM-2678 Improve performance of
LoadAwareShuffleGrouping
Posted by bo...@apache.org.
STORM-2678 Improve performance of LoadAwareShuffleGrouping
* add performance tests for multi-threads
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cab86d08
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cab86d08
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cab86d08
Branch: refs/heads/master
Commit: cab86d08f6f49afaca5b49d6ed08079f700b3737
Parents: 4842788
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Aug 4 16:20:55 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Aug 4 16:20:55 2017 +0900
----------------------------------------------------------------------
.../grouping/LoadAwareShuffleGroupingTest.java | 200 +++++++++++++------
1 file changed, 144 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cab86d08/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 410c1f9..bf2269f 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -250,6 +250,86 @@ public class LoadAwareShuffleGroupingTest {
assertTrue(load2 <= max2PrCount);
}
+ @Ignore
+ @Test
+ public void testBenchmarkLoadAwareShuffleGroupingEvenLoad() {
+ final int numTasks = 10;
+ List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+ buildLocalTasksEvenLoadMapping(availableTaskIds));
+ }
+
+ @Ignore
+ @Test
+ public void testBenchmarkLoadAwareShuffleGroupingUnevenLoad() {
+ final int numTasks = 10;
+ List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+ buildLocalTasksUnevenLoadMapping(availableTaskIds));
+ }
+
+ @Ignore
+ @Test
+ public void testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded()
+ throws ExecutionException, InterruptedException {
+ final int numTasks = 10;
+ final int numThreads = 2;
+ List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ runMultithreadedBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+ buildLocalTasksEvenLoadMapping(availableTaskIds), numThreads);
+ }
+
+ @Ignore
+ @Test
+ public void testBenchmarkLoadAwareShuffleGroupingUnevenLoadAndMultiThreaded()
+ throws ExecutionException, InterruptedException {
+ final int numTasks = 10;
+ final int numThreads = 2;
+ List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ runMultithreadedBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+ buildLocalTasksUnevenLoadMapping(availableTaskIds), numThreads);
+ }
+
+ private List<Integer> getAvailableTaskIds(int numTasks) {
+ // this method should return sequential numbers starting at 0
+ final List<Integer> availableTaskIds = Lists.newArrayList();
+ for (int i = 0; i < numTasks; i++) {
+ availableTaskIds.add(i);
+ }
+ return availableTaskIds;
+ }
+
+ private LoadMapping buildLocalTasksEvenLoadMapping(List<Integer> availableTasks) {
+ LoadMapping loadMapping = new LoadMapping();
+ Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+ for (int i = 0; i < availableTasks.size(); i++) {
+ localLoadMap.put(availableTasks.get(i), 0.1);
+ }
+ loadMapping.setLocal(localLoadMap);
+ return loadMapping;
+ }
+
+ private LoadMapping buildLocalTasksUnevenLoadMapping(List<Integer> availableTasks) {
+ LoadMapping loadMapping = new LoadMapping();
+ Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+ for (int i = 0; i < availableTasks.size(); i++) {
+ localLoadMap.put(availableTasks.get(i), 0.1 * (i + 1));
+ }
+ loadMapping.setLocal(localLoadMap);
+ return loadMapping;
+ }
+
+ private LoadMapping buildLocalTasksRandomLoadMapping(List<Integer> availableTasks) {
+ LoadMapping loadMapping = new LoadMapping();
+ Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+ for (int i = 0; i < availableTasks.size(); i++) {
+ localLoadMap.put(availableTasks.get(i), Math.random());
+ }
+ loadMapping.setLocal(localLoadMap);
+ return loadMapping;
+ }
+
+
private int[] runChooseTasksWithVerification(LoadAwareShuffleGrouping grouper, int totalEmits,
int numTasks, LoadMapping loadMapping) {
int[] taskCounts = new int[numTasks];
@@ -307,72 +387,51 @@ public class LoadAwareShuffleGroupingTest {
}
}
- @Ignore
- @Test
- public void testBenchmarkLoadAwareShuffleGroupingEvenLoad() {
- final int numTasks = 10;
- List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
- runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
- buildLocalTasksEvenLoadMapping(availableTaskIds));
- }
+ private void runSimpleBenchmark(LoadAwareCustomStreamGrouping grouper,
+ List<Integer> availableTaskIds, LoadMapping loadMapping) {
+ // Task Id not used, so just pick a static value
+ final int inputTaskId = 100;
- @Ignore
- @Test
- public void testBenchmarkLoadAwareShuffleGroupingUnevenLoad() {
- final int numTasks = 10;
- List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
- runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
- buildLocalTasksUnevenLoadMapping(availableTaskIds));
- }
+ WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+ grouper.prepare(context, null, availableTaskIds);
- private List<Integer> getAvailableTaskIds(int numTasks) {
- // this method should return sequential numbers starting at 0
- final List<Integer> availableTaskIds = Lists.newArrayList();
- for (int i = 0; i < numTasks; i++) {
- availableTaskIds.add(i);
- }
- return availableTaskIds;
- }
+ // triggers building distribution ring
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
- private LoadMapping buildLocalTasksEvenLoadMapping(List<Integer> availableTasks) {
- LoadMapping loadMapping = new LoadMapping();
- Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
- for (int i = 0; i < availableTasks.size(); i++) {
- localLoadMap.put(availableTasks.get(i), 0.1);
- }
- loadMapping.setLocal(localLoadMap);
- return loadMapping;
- }
+ long current = System.currentTimeMillis();
+ int idx = 0;
+ while (true) {
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
- private LoadMapping buildLocalTasksUnevenLoadMapping(List<Integer> availableTasks) {
- LoadMapping loadMapping = new LoadMapping();
- Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
- for (int i = 0; i < availableTasks.size(); i++) {
- localLoadMap.put(availableTasks.get(i), 0.1 * (i + 1));
+ idx++;
+ if (idx % 100000 == 0) {
+ // warm up 60 seconds
+ if (System.currentTimeMillis() - current >= 60_000) {
+ break;
+ }
+ }
}
- loadMapping.setLocal(localLoadMap);
- return loadMapping;
- }
- private LoadMapping buildLocalTasksRandomLoadMapping(List<Integer> availableTasks) {
- LoadMapping loadMapping = new LoadMapping();
- Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
- for (int i = 0; i < availableTasks.size(); i++) {
- localLoadMap.put(availableTasks.get(i), Math.random());
+ current = System.currentTimeMillis();
+ for (int i = 1; i <= 2_000_000_000 ; i++) {
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
}
- loadMapping.setLocal(localLoadMap);
- return loadMapping;
+
+ LOG.info("Duration: {} ms", (System.currentTimeMillis() - current));
}
- private void runSimpleBenchmark(LoadAwareCustomStreamGrouping grouper,
- List<Integer> availableTaskIds, LoadMapping loadMapping) {
+ private void runMultithreadedBenchmark(LoadAwareCustomStreamGrouping grouper,
+ List<Integer> availableTaskIds, LoadMapping loadMapping, int numThreads)
+ throws InterruptedException, ExecutionException {
// Task Id not used, so just pick a static value
final int inputTaskId = 100;
- WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+ final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+
+ // Call prepare with our available taskIds
grouper.prepare(context, null, availableTaskIds);
- // triggers building distribution ring
+ // triggers building ring
grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
long current = System.currentTimeMillis();
@@ -389,11 +448,40 @@ public class LoadAwareShuffleGroupingTest {
}
}
- current = System.currentTimeMillis();
- for (int i = 1; i <= 2_000_000_000 ; i++) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ final int groupingExecutionsPerThread = 2_000_000_000;
+
+ List<Callable<Long>> threadTasks = Lists.newArrayList();
+ for (int x = 0; x < numThreads; x++) {
+ Callable<Long> threadTask = new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ long current = System.currentTimeMillis();
+ for (int i = 1; i <= groupingExecutionsPerThread; i++) {
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ }
+ return System.currentTimeMillis() - current;
+ }
+ };
+
+ // Add to our collection.
+ threadTasks.add(threadTask);
+ }
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadTasks.size());
+ List<Future<Long>> taskResults = executor.invokeAll(threadTasks);
+
+ // Wait for all tasks to complete
+ Long maxDurationMillis = 0L;
+ for (Future taskResult: taskResults) {
+ while (!taskResult.isDone()) {
+ Thread.sleep(100);
+ }
+ Long durationMillis = (Long) taskResult.get();
+ if (maxDurationMillis < durationMillis) {
+ maxDurationMillis = durationMillis;
+ }
}
- System.out.println("Duration: " + (System.currentTimeMillis() - current) + " ms");
+ LOG.info("Max duration among threads is : {} ms", maxDurationMillis);
}
}
\ No newline at end of file
[4/8] storm git commit: STORM-2678 Improve performance of
LoadAwareShuffleGrouping
Posted by bo...@apache.org.
STORM-2678 Improve performance of LoadAwareShuffleGrouping
* add a new way to provide LoadMapping: via push
* refresh load and push updated LoadMapping to all groupers when refreshLoadTimer is activated
* update tests to reflect the change
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1fd83a9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1fd83a9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1fd83a9
Branch: refs/heads/master
Commit: a1fd83a987995430269cd25ef360f703e7eb8f99
Parents: 8f63d5a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Aug 7 23:51:39 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Aug 7 23:51:39 2017 +0900
----------------------------------------------------------------------
.../org/apache/storm/daemon/GrouperFactory.java | 10 ++++++
.../org/apache/storm/daemon/worker/Worker.java | 11 ++++++-
.../jvm/org/apache/storm/executor/Executor.java | 19 +++++++++++
.../apache/storm/executor/ExecutorShutdown.java | 6 ++++
.../apache/storm/executor/IRunningExecutor.java | 2 ++
.../grouping/LoadAwareCustomStreamGrouping.java | 1 +
.../grouping/LoadAwareShuffleGrouping.java | 28 +++-------------
.../grouping/LoadAwareShuffleGroupingTest.java | 34 ++++++++++++++++----
8 files changed, 81 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
index 7c61136..03e029e 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -119,6 +119,11 @@ public class GrouperFactory {
}
@Override
+ public void refreshLoad(LoadMapping loadMapping) {
+
+ }
+
+ @Override
public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
return customStreamGrouping.chooseTasks(taskId, values);
}
@@ -224,6 +229,11 @@ public class GrouperFactory {
// A no-op grouper
public static final LoadAwareCustomStreamGrouping DIRECT = new LoadAwareCustomStreamGrouping() {
@Override
+ public void refreshLoad(LoadMapping loadMapping) {
+
+ }
+
+ @Override
public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
return null;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 59e86c7..0b2ab40 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -265,7 +265,7 @@ public class Worker implements Shutdownable, DaemonCommon {
// The jitter allows the clients to get the data at different times, and avoids thundering herd
if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
- workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
+ workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this::doRefreshLoad);
}
workerState.refreshConnectionsTimer.scheduleRecurring(0,
@@ -285,6 +285,15 @@ public class Worker implements Shutdownable, DaemonCommon {
}
+ public void doRefreshLoad() {
+ workerState.refreshLoad();
+
+ final List<IRunningExecutor> executors = executorsAtom.get();
+ for (IRunningExecutor executor : executors) {
+ executor.loadChanged(workerState.getLoadMapping());
+ }
+ }
+
public void doHeartBeat() throws IOException {
LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index c1c6350..5f2adf7 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -25,9 +25,11 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -54,6 +56,7 @@ import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IMetricsConsumer;
import org.apache.storm.stats.BoltExecutorStats;
@@ -79,6 +82,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
public abstract class Executor implements Callable, EventHandler<Object> {
@@ -102,6 +106,7 @@ public abstract class Executor implements Callable, EventHandler<Object> {
protected CommonStats stats;
protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;
protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamToComponentToGrouper;
+ protected final List<LoadAwareCustomStreamGrouping> groupers;
protected final ReportErrorAndDie reportErrorDie;
protected final Callable<Boolean> sampler;
protected ExecutorTransfer executorTransfer;
@@ -162,6 +167,13 @@ public abstract class Executor implements Callable, EventHandler<Object> {
this.intervalToTaskToMetricToRegistry = new HashMap<>();
this.taskToComponent = workerData.getTaskToComponent();
this.streamToComponentToGrouper = outboundComponents(workerTopologyContext, componentId, topoConf);
+ if (this.streamToComponentToGrouper != null) {
+ this.groupers = streamToComponentToGrouper.values().stream()
+ .filter(Objects::nonNull)
+ .flatMap(m -> m.values().stream()).collect(Collectors.toList());
+ } else {
+ this.groupers = Collections.emptyList();
+ }
this.reportError = new ReportError(topoConf, stormClusterState, stormId, componentId, workerTopologyContext);
this.reportErrorDie = new ReportErrorAndDie(reportError, suicideFn);
this.sampler = ConfigUtils.mkStatsSampler(topoConf);
@@ -340,6 +352,12 @@ public abstract class Executor implements Callable, EventHandler<Object> {
}
}
+ public void reflectNewLoadMapping(LoadMapping loadMapping) {
+ for (LoadAwareCustomStreamGrouping g : groupers) {
+ g.refreshLoad(loadMapping);
+ }
+ }
+
private void registerBackpressure() {
receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback() {
@Override
@@ -589,4 +607,5 @@ public abstract class Executor implements Callable, EventHandler<Object> {
}
return ret;
}
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 144ee1b..7ea48b0 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -23,6 +23,7 @@ import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.daemon.Task;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.hooks.ITaskHook;
import org.apache.storm.spout.ISpout;
import org.apache.storm.task.IBolt;
@@ -69,6 +70,11 @@ public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
}
@Override
+ public void loadChanged(LoadMapping loadMapping) {
+ executor.reflectNewLoadMapping(loadMapping);
+ }
+
+ @Override
public boolean getBackPressureFlag() {
return executor.getBackpressure();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
index 441fdff..e7a4117 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
@@ -19,6 +19,7 @@ package org.apache.storm.executor;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.grouping.LoadMapping;
import java.util.List;
@@ -27,5 +28,6 @@ public interface IRunningExecutor {
ExecutorStats renderStats();
List<Long> getExecutorId();
void credentialsChanged(Credentials credentials);
+ void loadChanged(LoadMapping loadMapping);
boolean getBackPressureFlag();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
index d825d2e..fba7254 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
@@ -20,5 +20,6 @@ package org.apache.storm.grouping;
import java.util.List;
public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
+ void refreshLoad(LoadMapping loadMapping);
List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 84e4612..20437a1 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -33,9 +33,6 @@ import org.apache.storm.task.WorkerTopologyContext;
public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
private static final int CAPACITY_TASK_MULTIPLICATION = 100;
- @VisibleForTesting
- static final int CHECK_UPDATE_INDEX = 100;
-
private Random random;
private List<Integer>[] rets;
private int[] targets;
@@ -43,10 +40,6 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
private AtomicInteger current;
private int actualCapacity = 0;
- private AtomicInteger skipCheckingUpdateCount;
- private AtomicBoolean isUpdating;
- private long lastUpdate = 0;
-
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
random = new Random();
@@ -70,8 +63,6 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
Collections.shuffle(choices, random);
current = new AtomicInteger(0);
- skipCheckingUpdateCount = new AtomicInteger(0);
- isUpdating = new AtomicBoolean(false);
}
@Override
@@ -80,21 +71,12 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
}
@Override
- public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
- if (skipCheckingUpdateCount.incrementAndGet() == CHECK_UPDATE_INDEX) {
- skipCheckingUpdateCount.set(0);
- if ((lastUpdate + 1000) < System.currentTimeMillis()
- && isUpdating.compareAndSet(false, true)) {
- // before finishing updateRing(), concurrent call will still rely on old choices
- updateRing(load);
-
- // update time and open a chance to update ring again
- lastUpdate = System.currentTimeMillis();
- skipCheckingUpdateCount.set(0);
- isUpdating.set(false);
- }
- }
+ public void refreshLoad(LoadMapping loadMapping) {
+ updateRing(loadMapping);
+ }
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
int rightNow;
int size = choices.size();
while (true) {
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 667f36d..ed08a05 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -18,10 +18,13 @@
package org.apache.storm.grouping;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.storm.StormTimer;
import org.apache.storm.daemon.GrouperFactory;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.NullStruct;
import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
@@ -38,6 +41,9 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -90,9 +96,7 @@ public class LoadAwareShuffleGroupingTest {
grouper.prepare(context, null, availableTaskIds);
// force triggers building ring
- for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
- }
+ grouper.refreshLoad(loadMapping);
// calling chooseTasks should be finished before refreshing ring
// adjusting groupingExecutionsPerThread might be needed with really slow machine
@@ -209,6 +213,9 @@ public class LoadAwareShuffleGroupingTest {
loadInfoMap.put(2, 0.0);
load.setLocal(loadInfoMap);
+ // force triggers building ring
+ shuffler.refreshLoad(load);
+
List<Object> data = Lists.newArrayList(1, 2);
int[] frequencies = new int[3];
for (int i = 0 ; i < numMessages ; i++) {
@@ -246,6 +253,9 @@ public class LoadAwareShuffleGroupingTest {
loadInfoMap.put(2, 0.0);
load.setLocal(loadInfoMap);
+ // force triggers building ring
+ shuffler.refreshLoad(load);
+
List<Object> data = Lists.newArrayList(1, 2);
int[] frequencies = new int[3]; // task id starts from 1
for (int i = 0 ; i < numMessages ; i++) {
@@ -354,9 +364,7 @@ public class LoadAwareShuffleGroupingTest {
int inputTaskId = 100;
// force triggers building ring
- for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
- }
+ grouper.refreshLoad(loadMapping);
for (int i = 1; i <= totalEmits; i++) {
List<Integer> taskIds = grouper
@@ -413,6 +421,11 @@ public class LoadAwareShuffleGroupingTest {
WorkerTopologyContext context = mock(WorkerTopologyContext.class);
grouper.prepare(context, null, availableTaskIds);
+ // periodically calls refreshLoad in 1 sec to simulate worker load update timer
+ ScheduledExecutorService refreshService = MoreExecutors.getExitingScheduledExecutorService(
+ new ScheduledThreadPoolExecutor(1));
+ refreshService.scheduleAtFixedRate(() -> grouper.refreshLoad(loadMapping), 1, 1, TimeUnit.SECONDS);
+
long current = System.currentTimeMillis();
int idx = 0;
while (true) {
@@ -433,6 +446,8 @@ public class LoadAwareShuffleGroupingTest {
}
LOG.info("Duration: {} ms", (System.currentTimeMillis() - current));
+
+ refreshService.shutdownNow();
}
private void runMultithreadedBenchmark(LoadAwareCustomStreamGrouping grouper,
@@ -446,6 +461,11 @@ public class LoadAwareShuffleGroupingTest {
// Call prepare with our available taskIds
grouper.prepare(context, null, availableTaskIds);
+ // periodically calls refreshLoad in 1 sec to simulate worker load update timer
+ ScheduledExecutorService refreshService = MoreExecutors.getExitingScheduledExecutorService(
+ new ScheduledThreadPoolExecutor(1));
+ refreshService.scheduleAtFixedRate(() -> grouper.refreshLoad(loadMapping), 1, 1, TimeUnit.SECONDS);
+
long current = System.currentTimeMillis();
int idx = 0;
while (true) {
@@ -495,5 +515,7 @@ public class LoadAwareShuffleGroupingTest {
}
LOG.info("Max duration among threads is : {} ms", maxDurationMillis);
+
+ refreshService.shutdownNow();
}
}
\ No newline at end of file
[5/8] storm git commit: STORM-2678 Improve performance of
LoadAwareShuffleGrouping
Posted by bo...@apache.org.
STORM-2678 Improve performance of LoadAwareShuffleGrouping
* change everything to Array and pre-allocate all
* use static length for choices
* prepare backup array for choices pre-allocated and swap to avoid allocating arrays
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/08038b6d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/08038b6d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/08038b6d
Branch: refs/heads/master
Commit: 08038b6d65462788796348e40adbacfa6d2f9467
Parents: a1fd83a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Aug 9 07:18:21 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 9 13:23:50 2017 +0900
----------------------------------------------------------------------
.../grouping/LoadAwareShuffleGrouping.java | 103 ++++++++++++-------
.../grouping/LoadAwareShuffleGroupingTest.java | 27 ++---
2 files changed, 77 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/08038b6d/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 20437a1..080c48d 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -27,18 +27,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.WorkerTopologyContext;
public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
- private static final int CAPACITY_TASK_MULTIPLICATION = 100;
+ private static final int CAPACITY = 1000;
private Random random;
private List<Integer>[] rets;
private int[] targets;
- private ArrayList<List<Integer>> choices;
+ private int[] loads;
+ private int[] unassigned;
+ private int[] choices;
+ private int[] prepareChoices;
private AtomicInteger current;
- private int actualCapacity = 0;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
@@ -51,18 +54,22 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
targets[i] = targetTasks.get(i);
}
- actualCapacity = targets.length * CAPACITY_TASK_MULTIPLICATION;
-
// can't leave choices to be empty, so initiate it similar as ShuffleGrouping
- choices = new ArrayList<>(actualCapacity);
- int index = 0;
- while (index < actualCapacity) {
- choices.add(rets[index % targets.length]);
- index++;
+ choices = new int[CAPACITY];
+
+ for (int i = 0 ; i < CAPACITY ; i++) {
+ choices[i] = i % rets.length;
}
- Collections.shuffle(choices, random);
+ shuffleArray(choices);
current = new AtomicInteger(0);
+
+ // allocate another array to be switched
+ prepareChoices = new int[CAPACITY];
+
+ // allocating only once
+ loads = new int[targets.length];
+ unassigned = new int[targets.length];
}
@Override
@@ -78,14 +85,13 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
int rightNow;
- int size = choices.size();
while (true) {
rightNow = current.incrementAndGet();
- if (rightNow < size) {
- return choices.get(rightNow);
- } else if (rightNow == size) {
+ if (rightNow < CAPACITY) {
+ return rets[choices[rightNow]];
+ } else if (rightNow == CAPACITY) {
current.set(0);
- return choices.get(0);
+ return rets[choices[0]];
}
//race condition with another thread, and we lost
// try again
@@ -94,45 +100,70 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
private void updateRing(LoadMapping load) {
int localTotal = 0;
- int[] loads = new int[targets.length];
for (int i = 0 ; i < targets.length; i++) {
int val = (int)(101 - (load.get(targets[i]) * 100));
loads[i] = val;
localTotal += val;
}
- // allocating enough memory doesn't hurt much, so assign aggressively
- // we will cut out if actual size becomes bigger than actualCapacity
- ArrayList<List<Integer>> newChoices = new ArrayList<>(actualCapacity + targets.length);
+ int currentIdx = 0;
+ int unassignedIdx = 0;
for (int i = 0 ; i < loads.length ; i++) {
+ if (currentIdx == CAPACITY) {
+ break;
+ }
+
int loadForTask = loads[i];
- int amount = loadForTask * actualCapacity / localTotal;
+ int amount = Math.round(loadForTask * 1.0f * CAPACITY / localTotal);
// assign at least one for task
if (amount == 0) {
- amount = 1;
+ unassigned[unassignedIdx++] = i;
}
for (int j = 0; j < amount; j++) {
- newChoices.add(rets[i]);
+ if (currentIdx == CAPACITY) {
+ break;
+ }
+
+ prepareChoices[currentIdx++] = i;
}
}
- Collections.shuffle(newChoices, random);
-
- // make sure length of newChoices is same as actualCapacity, like current choices
- // this ensures safety when requests and update occurs concurrently
- if (newChoices.size() > actualCapacity) {
- newChoices.subList(actualCapacity, newChoices.size()).clear();
- } else if (newChoices.size() < actualCapacity) {
- int remaining = actualCapacity - newChoices.size();
- while (remaining > 0) {
- newChoices.add(newChoices.get(remaining % newChoices.size()));
- remaining--;
+ if (currentIdx < CAPACITY) {
+ // if there're some rooms, give unassigned tasks a chance to be included
+ // this should be really small amount, so just add them sequentially
+ if (unassignedIdx > 0) {
+ for (int i = currentIdx ; i < CAPACITY ; i++) {
+ prepareChoices[i] = unassigned[(i - currentIdx) % unassignedIdx];
+ }
+ } else {
+ // just pick random
+ for (int i = currentIdx ; i < CAPACITY ; i++) {
+ prepareChoices[i] = random.nextInt(loads.length);
+ }
}
}
- assert newChoices.size() == actualCapacity;
+ shuffleArray(prepareChoices);
+
+ // swapping two arrays
+ int[] tempForSwap = choices;
+ choices = prepareChoices;
+ prepareChoices = tempForSwap;
- choices = newChoices;
current.set(0);
}
+
+ private void shuffleArray(int[] arr) {
+ int size = arr.length;
+ for (int i = size; i > 1; i--) {
+ swap(arr, i - 1, random.nextInt(i));
+ }
+ }
+
+ private void swap(int[] arr, int i, int j) {
+ int tmp = arr[i];
+ arr[i] = arr[j];
+ arr[j] = tmp;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/08038b6d/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index ed08a05..2d90d66 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -71,12 +71,14 @@ public class LoadAwareShuffleGroupingTest {
// distribution should be even for all nodes when loads are even
int desiredTaskCountPerTask = 5000;
int totalEmits = numTasks * desiredTaskCountPerTask;
+ int minPrCount = (int) (totalEmits * ((1.0 / numTasks) - ACCEPTABLE_MARGIN));
+ int maxPrCount = (int) (totalEmits * ((1.0 / numTasks) + ACCEPTABLE_MARGIN));
int[] taskCounts = runChooseTasksWithVerification(grouper, totalEmits, numTasks, loadMapping);
for (int i = 0; i < numTasks; i++) {
- assertEquals("Distribution should be even for all nodes", desiredTaskCountPerTask,
- taskCounts[i]);
+ assertTrue("Distribution should be even for all nodes with small delta",
+ taskCounts[i] >= minPrCount && taskCounts[i] <= maxPrCount);
}
}
@@ -106,6 +108,7 @@ public class LoadAwareShuffleGroupingTest {
// distribution should be exactly even
final int groupingExecutionsPerThread = numTasks * 5000;
final int numThreads = 10;
+ int totalEmits = groupingExecutionsPerThread * numThreads;
List<Callable<int[]>> threadTasks = Lists.newArrayList();
for (int x = 0; x < numThreads; x++) {
@@ -149,22 +152,12 @@ public class LoadAwareShuffleGroupingTest {
}
}
- int[] loads = new int[numTasks];
- int localTotal = 0;
- List<Double> loadRate = new ArrayList<>();
- for (int i = 0; i < numTasks; i++) {
- int val = (int)(101 - (loadMapping.get(i) * 100));
- loads[i] = val;
- localTotal += val;
- }
-
- for (int i = 0; i < numTasks; i++) {
- loadRate.add(loads[i] * 1.0 / localTotal);
- }
+ int minPrCount = (int) (totalEmits * ((1.0 / numTasks) - ACCEPTABLE_MARGIN));
+ int maxPrCount = (int) (totalEmits * ((1.0 / numTasks) + ACCEPTABLE_MARGIN));
for (int i = 0; i < numTasks; i++) {
- int expected = numThreads * groupingExecutionsPerThread / numTasks;
- assertEquals("Distribution should be even for all nodes", expected, taskIdTotals[i]);
+ assertTrue("Distribution should be even for all nodes with small delta",
+ taskIdTotals[i] >= minPrCount && taskIdTotals[i] <= maxPrCount);
}
}
@@ -441,7 +434,7 @@ public class LoadAwareShuffleGroupingTest {
}
current = System.currentTimeMillis();
- for (int i = 1; i <= 2_000_000_000 ; i++) {
+ for (int i = 1; i <= 2_000_000_000; i++) {
grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
}
[6/8] storm git commit: STORM-2678 Improve performance of
LoadAwareShuffleGrouping
Posted by bo...@apache.org.
STORM-2678 Improve performance of LoadAwareShuffleGrouping
* Let chooseTask() read from index 0, not 1
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ab38a7a0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ab38a7a0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ab38a7a0
Branch: refs/heads/master
Commit: ab38a7a0c87a857f9a9021d9805d8d63c794ef74
Parents: 08038b6
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Aug 23 23:42:23 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 23 23:42:23 2017 +0900
----------------------------------------------------------------------
.../jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ab38a7a0/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 080c48d..97c5ce1 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -62,7 +62,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
}
shuffleArray(choices);
- current = new AtomicInteger(0);
+ current = new AtomicInteger(-1);
// allocate another array to be switched
prepareChoices = new int[CAPACITY];
@@ -150,7 +150,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
choices = prepareChoices;
prepareChoices = tempForSwap;
- current.set(0);
+ current.set(-1);
}
private void shuffleArray(int[] arr) {
[7/8] storm git commit: STORM-2678 Improve performance of
LoadAwareShuffleGrouping
Posted by bo...@apache.org.
STORM-2678 Improve performance of LoadAwareShuffleGrouping
* remove LoadAware specific chooseTasks method
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4dade36c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4dade36c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4dade36c
Branch: refs/heads/master
Commit: 4dade36c6200a1fea06aa207f3f6d8470b725d0f
Parents: ab38a7a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Aug 24 09:15:27 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Aug 24 09:15:27 2017 +0900
----------------------------------------------------------------------
.../org/apache/storm/daemon/GrouperFactory.java | 10 ----------
.../src/jvm/org/apache/storm/daemon/Task.java | 2 +-
.../grouping/LoadAwareCustomStreamGrouping.java | 3 ---
.../storm/grouping/LoadAwareShuffleGrouping.java | 15 +++++----------
.../grouping/LoadAwareShuffleGroupingTest.java | 17 ++++++++---------
.../test/clj/org/apache/storm/grouping_test.clj | 2 +-
6 files changed, 15 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
index 03e029e..179e882 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -124,11 +124,6 @@ public class GrouperFactory {
}
@Override
- public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
- return customStreamGrouping.chooseTasks(taskId, values);
- }
-
- @Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
customStreamGrouping.prepare(context, stream, targetTasks);
}
@@ -234,11 +229,6 @@ public class GrouperFactory {
}
@Override
- public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
- return null;
- }
-
- @Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index b79a259..0bc56fb 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -137,7 +137,7 @@ public class Task {
if (grouper == GrouperFactory.DIRECT) {
throw new IllegalArgumentException("Cannot do regular emit to direct stream");
}
- List<Integer> compTasks = grouper.chooseTasks(taskId, values, loadMapping);
+ List<Integer> compTasks = grouper.chooseTasks(taskId, values);
outTasks.addAll(compTasks);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
index fba7254..6e5a3fd 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
@@ -17,9 +17,6 @@
*/
package org.apache.storm.grouping;
-import java.util.List;
-
public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
void refreshLoad(LoadMapping loadMapping);
- List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 97c5ce1..0c0560a 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -74,16 +74,6 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
- throw new RuntimeException("NOT IMPLEMENTED");
- }
-
- @Override
- public void refreshLoad(LoadMapping loadMapping) {
- updateRing(loadMapping);
- }
-
- @Override
- public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
int rightNow;
while (true) {
rightNow = current.incrementAndGet();
@@ -98,6 +88,11 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
}
}
+ @Override
+ public void refreshLoad(LoadMapping loadMapping) {
+ updateRing(loadMapping);
+ }
+
private void updateRing(LoadMapping load) {
int localTotal = 0;
for (int i = 0 ; i < targets.length; i++) {
http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 2d90d66..53ac404 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -117,8 +117,7 @@ public class LoadAwareShuffleGroupingTest {
public int[] call() throws Exception {
int[] taskCounts = new int[availableTaskIds.size()];
for (int i = 1; i <= groupingExecutionsPerThread; i++) {
- List<Integer> taskIds = grouper.chooseTasks(inputTaskId,
- Lists.newArrayList(), loadMapping);
+ List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList());
// Validate a single task id return
assertNotNull("Not null taskId list returned", taskIds);
@@ -212,7 +211,7 @@ public class LoadAwareShuffleGroupingTest {
List<Object> data = Lists.newArrayList(1, 2);
int[] frequencies = new int[3];
for (int i = 0 ; i < numMessages ; i++) {
- List<Integer> tasks = shuffler.chooseTasks(1, data, load);
+ List<Integer> tasks = shuffler.chooseTasks(1, data);
for (int task : tasks) {
frequencies[task]++;
}
@@ -252,7 +251,7 @@ public class LoadAwareShuffleGroupingTest {
List<Object> data = Lists.newArrayList(1, 2);
int[] frequencies = new int[3]; // task id starts from 1
for (int i = 0 ; i < numMessages ; i++) {
- List<Integer> tasks = shuffler.chooseTasks(1, data, load);
+ List<Integer> tasks = shuffler.chooseTasks(1, data);
for (int task : tasks) {
frequencies[task]++;
}
@@ -361,7 +360,7 @@ public class LoadAwareShuffleGroupingTest {
for (int i = 1; i <= totalEmits; i++) {
List<Integer> taskIds = grouper
- .chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ .chooseTasks(inputTaskId, Lists.newArrayList());
// Validate a single task id return
assertNotNull("Not null taskId list returned", taskIds);
@@ -422,7 +421,7 @@ public class LoadAwareShuffleGroupingTest {
long current = System.currentTimeMillis();
int idx = 0;
while (true) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList());
idx++;
if (idx % 100000 == 0) {
@@ -435,7 +434,7 @@ public class LoadAwareShuffleGroupingTest {
current = System.currentTimeMillis();
for (int i = 1; i <= 2_000_000_000; i++) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList());
}
LOG.info("Duration: {} ms", (System.currentTimeMillis() - current));
@@ -462,7 +461,7 @@ public class LoadAwareShuffleGroupingTest {
long current = System.currentTimeMillis();
int idx = 0;
while (true) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList());
idx++;
if (idx % 100000 == 0) {
@@ -482,7 +481,7 @@ public class LoadAwareShuffleGroupingTest {
public Long call() throws Exception {
long current = System.currentTimeMillis();
for (int i = 1; i <= groupingExecutionsPerThread; i++) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList());
}
return System.currentTimeMillis() - current;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index 68ebb4e..c216008 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -32,7 +32,7 @@
min-prcnt (int (* num-messages 0.49))
max-prcnt (int (* num-messages 0.51))
data [1 2]
- freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data nil)))
+ freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data)))
load1 (.get freq [(int 1)])
load2 (.get freq [(int 2)])]
(log-message "FREQ:" freq)