You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 18:09:20 UTC
[2/8] storm git commit: STORM-2678 Improve performance of
LoadAwareShuffleGrouping
STORM-2678 Improve performance of LoadAwareShuffleGrouping
* add performance tests for multi-threads
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cab86d08
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cab86d08
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cab86d08
Branch: refs/heads/master
Commit: cab86d08f6f49afaca5b49d6ed08079f700b3737
Parents: 4842788
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Aug 4 16:20:55 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Aug 4 16:20:55 2017 +0900
----------------------------------------------------------------------
.../grouping/LoadAwareShuffleGroupingTest.java | 200 +++++++++++++------
1 file changed, 144 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cab86d08/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 410c1f9..bf2269f 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -250,6 +250,86 @@ public class LoadAwareShuffleGroupingTest {
assertTrue(load2 <= max2PrCount);
}
+ @Ignore
+ @Test
+ public void testBenchmarkLoadAwareShuffleGroupingEvenLoad() {
+ final int numTasks = 10;
+ List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+ buildLocalTasksEvenLoadMapping(availableTaskIds));
+ }
+
+ @Ignore
+ @Test
+ public void testBenchmarkLoadAwareShuffleGroupingUnevenLoad() {
+ final int numTasks = 10;
+ List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+ buildLocalTasksUnevenLoadMapping(availableTaskIds));
+ }
+
+ @Ignore
+ @Test
+ public void testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded()
+ throws ExecutionException, InterruptedException {
+ final int numTasks = 10;
+ final int numThreads = 2;
+ List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ runMultithreadedBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+ buildLocalTasksEvenLoadMapping(availableTaskIds), numThreads);
+ }
+
+ @Ignore
+ @Test
+ public void testBenchmarkLoadAwareShuffleGroupingUnevenLoadAndMultiThreaded()
+ throws ExecutionException, InterruptedException {
+ final int numTasks = 10;
+ final int numThreads = 2;
+ List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+ runMultithreadedBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+ buildLocalTasksUnevenLoadMapping(availableTaskIds), numThreads);
+ }
+
+ private List<Integer> getAvailableTaskIds(int numTasks) {
+ // this method should return sequential numbers starting at 0
+ final List<Integer> availableTaskIds = Lists.newArrayList();
+ for (int i = 0; i < numTasks; i++) {
+ availableTaskIds.add(i);
+ }
+ return availableTaskIds;
+ }
+
+ private LoadMapping buildLocalTasksEvenLoadMapping(List<Integer> availableTasks) {
+ LoadMapping loadMapping = new LoadMapping();
+ Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+ for (int i = 0; i < availableTasks.size(); i++) {
+ localLoadMap.put(availableTasks.get(i), 0.1);
+ }
+ loadMapping.setLocal(localLoadMap);
+ return loadMapping;
+ }
+
+ private LoadMapping buildLocalTasksUnevenLoadMapping(List<Integer> availableTasks) {
+ LoadMapping loadMapping = new LoadMapping();
+ Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+ for (int i = 0; i < availableTasks.size(); i++) {
+ localLoadMap.put(availableTasks.get(i), 0.1 * (i + 1));
+ }
+ loadMapping.setLocal(localLoadMap);
+ return loadMapping;
+ }
+
+ private LoadMapping buildLocalTasksRandomLoadMapping(List<Integer> availableTasks) {
+ LoadMapping loadMapping = new LoadMapping();
+ Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+ for (int i = 0; i < availableTasks.size(); i++) {
+ localLoadMap.put(availableTasks.get(i), Math.random());
+ }
+ loadMapping.setLocal(localLoadMap);
+ return loadMapping;
+ }
+
+
private int[] runChooseTasksWithVerification(LoadAwareShuffleGrouping grouper, int totalEmits,
int numTasks, LoadMapping loadMapping) {
int[] taskCounts = new int[numTasks];
@@ -307,72 +387,51 @@ public class LoadAwareShuffleGroupingTest {
}
}
- @Ignore
- @Test
- public void testBenchmarkLoadAwareShuffleGroupingEvenLoad() {
- final int numTasks = 10;
- List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
- runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
- buildLocalTasksEvenLoadMapping(availableTaskIds));
- }
+ private void runSimpleBenchmark(LoadAwareCustomStreamGrouping grouper,
+ List<Integer> availableTaskIds, LoadMapping loadMapping) {
+ // Task Id not used, so just pick a static value
+ final int inputTaskId = 100;
- @Ignore
- @Test
- public void testBenchmarkLoadAwareShuffleGroupingUnevenLoad() {
- final int numTasks = 10;
- List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
- runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
- buildLocalTasksUnevenLoadMapping(availableTaskIds));
- }
+ WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+ grouper.prepare(context, null, availableTaskIds);
- private List<Integer> getAvailableTaskIds(int numTasks) {
- // this method should return sequential numbers starting at 0
- final List<Integer> availableTaskIds = Lists.newArrayList();
- for (int i = 0; i < numTasks; i++) {
- availableTaskIds.add(i);
- }
- return availableTaskIds;
- }
+ // triggers building distribution ring
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
- private LoadMapping buildLocalTasksEvenLoadMapping(List<Integer> availableTasks) {
- LoadMapping loadMapping = new LoadMapping();
- Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
- for (int i = 0; i < availableTasks.size(); i++) {
- localLoadMap.put(availableTasks.get(i), 0.1);
- }
- loadMapping.setLocal(localLoadMap);
- return loadMapping;
- }
+ long current = System.currentTimeMillis();
+ int idx = 0;
+ while (true) {
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
- private LoadMapping buildLocalTasksUnevenLoadMapping(List<Integer> availableTasks) {
- LoadMapping loadMapping = new LoadMapping();
- Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
- for (int i = 0; i < availableTasks.size(); i++) {
- localLoadMap.put(availableTasks.get(i), 0.1 * (i + 1));
+ idx++;
+ if (idx % 100000 == 0) {
+ // warm up 60 seconds
+ if (System.currentTimeMillis() - current >= 60_000) {
+ break;
+ }
+ }
}
- loadMapping.setLocal(localLoadMap);
- return loadMapping;
- }
- private LoadMapping buildLocalTasksRandomLoadMapping(List<Integer> availableTasks) {
- LoadMapping loadMapping = new LoadMapping();
- Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
- for (int i = 0; i < availableTasks.size(); i++) {
- localLoadMap.put(availableTasks.get(i), Math.random());
+ current = System.currentTimeMillis();
+ for (int i = 1; i <= 2_000_000_000 ; i++) {
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
}
- loadMapping.setLocal(localLoadMap);
- return loadMapping;
+
+ LOG.info("Duration: {} ms", (System.currentTimeMillis() - current));
}
- private void runSimpleBenchmark(LoadAwareCustomStreamGrouping grouper,
- List<Integer> availableTaskIds, LoadMapping loadMapping) {
+ private void runMultithreadedBenchmark(LoadAwareCustomStreamGrouping grouper,
+ List<Integer> availableTaskIds, LoadMapping loadMapping, int numThreads)
+ throws InterruptedException, ExecutionException {
// Task Id not used, so just pick a static value
final int inputTaskId = 100;
- WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+ final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+
+ // Call prepare with our available taskIds
grouper.prepare(context, null, availableTaskIds);
- // triggers building distribution ring
+ // triggers building ring
grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
long current = System.currentTimeMillis();
@@ -389,11 +448,40 @@ public class LoadAwareShuffleGroupingTest {
}
}
- current = System.currentTimeMillis();
- for (int i = 1; i <= 2_000_000_000 ; i++) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ final int groupingExecutionsPerThread = 2_000_000_000;
+
+ List<Callable<Long>> threadTasks = Lists.newArrayList();
+ for (int x = 0; x < numThreads; x++) {
+ Callable<Long> threadTask = new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ long current = System.currentTimeMillis();
+ for (int i = 1; i <= groupingExecutionsPerThread; i++) {
+ grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+ }
+ return System.currentTimeMillis() - current;
+ }
+ };
+
+ // Add to our collection.
+ threadTasks.add(threadTask);
+ }
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadTasks.size());
+ List<Future<Long>> taskResults = executor.invokeAll(threadTasks);
+
+ // Wait for all tasks to complete
+ Long maxDurationMillis = 0L;
+ for (Future taskResult: taskResults) {
+ while (!taskResult.isDone()) {
+ Thread.sleep(100);
+ }
+ Long durationMillis = (Long) taskResult.get();
+ if (maxDurationMillis < durationMillis) {
+ maxDurationMillis = durationMillis;
+ }
}
- System.out.println("Duration: " + (System.currentTimeMillis() - current) + " ms");
+ LOG.info("Max duration among threads is : {} ms", maxDurationMillis);
}
}
\ No newline at end of file