You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 18:09:20 UTC

[2/8] storm git commit: STORM-2678 Improve performance of LoadAwareShuffleGrouping

STORM-2678 Improve performance of LoadAwareShuffleGrouping

* add performance tests for multi-threads


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cab86d08
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cab86d08
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cab86d08

Branch: refs/heads/master
Commit: cab86d08f6f49afaca5b49d6ed08079f700b3737
Parents: 4842788
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Aug 4 16:20:55 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Aug 4 16:20:55 2017 +0900

----------------------------------------------------------------------
 .../grouping/LoadAwareShuffleGroupingTest.java  | 200 +++++++++++++------
 1 file changed, 144 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cab86d08/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 410c1f9..bf2269f 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -250,6 +250,86 @@ public class LoadAwareShuffleGroupingTest {
         assertTrue(load2 <= max2PrCount);
     }
 
+    @Ignore
+    @Test
+    public void testBenchmarkLoadAwareShuffleGroupingEvenLoad() {
+        final int numTasks = 10;
+        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+        runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+            buildLocalTasksEvenLoadMapping(availableTaskIds));
+    }
+
+    @Ignore
+    @Test
+    public void testBenchmarkLoadAwareShuffleGroupingUnevenLoad() {
+        final int numTasks = 10;
+        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+        runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+            buildLocalTasksUnevenLoadMapping(availableTaskIds));
+    }
+
+    @Ignore
+    @Test
+    public void testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded()
+        throws ExecutionException, InterruptedException {
+        final int numTasks = 10;
+        final int numThreads = 2;
+        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+        runMultithreadedBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+            buildLocalTasksEvenLoadMapping(availableTaskIds), numThreads);
+    }
+
+    @Ignore
+    @Test
+    public void testBenchmarkLoadAwareShuffleGroupingUnevenLoadAndMultiThreaded()
+        throws ExecutionException, InterruptedException {
+        final int numTasks = 10;
+        final int numThreads = 2;
+        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+        runMultithreadedBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+            buildLocalTasksUnevenLoadMapping(availableTaskIds), numThreads);
+    }
+
+    private List<Integer> getAvailableTaskIds(int numTasks) {
+        // this method should return sequential numbers starting at 0
+        final List<Integer> availableTaskIds = Lists.newArrayList();
+        for (int i = 0; i < numTasks; i++) {
+            availableTaskIds.add(i);
+        }
+        return availableTaskIds;
+    }
+
+    private LoadMapping buildLocalTasksEvenLoadMapping(List<Integer> availableTasks) {
+        LoadMapping loadMapping = new LoadMapping();
+        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+        for (int i = 0; i < availableTasks.size(); i++) {
+            localLoadMap.put(availableTasks.get(i), 0.1);
+        }
+        loadMapping.setLocal(localLoadMap);
+        return loadMapping;
+    }
+
+    private LoadMapping buildLocalTasksUnevenLoadMapping(List<Integer> availableTasks) {
+        LoadMapping loadMapping = new LoadMapping();
+        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+        for (int i = 0; i < availableTasks.size(); i++) {
+            localLoadMap.put(availableTasks.get(i), 0.1 * (i + 1));
+        }
+        loadMapping.setLocal(localLoadMap);
+        return loadMapping;
+    }
+
+    private LoadMapping buildLocalTasksRandomLoadMapping(List<Integer> availableTasks) {
+        LoadMapping loadMapping = new LoadMapping();
+        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+        for (int i = 0; i < availableTasks.size(); i++) {
+            localLoadMap.put(availableTasks.get(i), Math.random());
+        }
+        loadMapping.setLocal(localLoadMap);
+        return loadMapping;
+    }
+
+
     private int[] runChooseTasksWithVerification(LoadAwareShuffleGrouping grouper, int totalEmits,
         int numTasks, LoadMapping loadMapping) {
         int[] taskCounts = new int[numTasks];
@@ -307,72 +387,51 @@ public class LoadAwareShuffleGroupingTest {
         }
     }
 
-    @Ignore
-    @Test
-    public void testBenchmarkLoadAwareShuffleGroupingEvenLoad() {
-        final int numTasks = 10;
-        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
-        runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
-            buildLocalTasksEvenLoadMapping(availableTaskIds));
-    }
+    private void runSimpleBenchmark(LoadAwareCustomStreamGrouping grouper,
+        List<Integer> availableTaskIds, LoadMapping loadMapping) {
+        // Task Id not used, so just pick a static value
+        final int inputTaskId = 100;
 
-    @Ignore
-    @Test
-    public void testBenchmarkLoadAwareShuffleGroupingUnevenLoad() {
-        final int numTasks = 10;
-        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
-        runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
-            buildLocalTasksUnevenLoadMapping(availableTaskIds));
-    }
+        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+        grouper.prepare(context, null, availableTaskIds);
 
-    private List<Integer> getAvailableTaskIds(int numTasks) {
-        // this method should return sequential numbers starting at 0
-        final List<Integer> availableTaskIds = Lists.newArrayList();
-        for (int i = 0; i < numTasks; i++) {
-            availableTaskIds.add(i);
-        }
-        return availableTaskIds;
-    }
+        // triggers building distribution ring
+        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
 
-    private LoadMapping buildLocalTasksEvenLoadMapping(List<Integer> availableTasks) {
-        LoadMapping loadMapping = new LoadMapping();
-        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
-        for (int i = 0; i < availableTasks.size(); i++) {
-            localLoadMap.put(availableTasks.get(i), 0.1);
-        }
-        loadMapping.setLocal(localLoadMap);
-        return loadMapping;
-    }
+        long current = System.currentTimeMillis();
+        int idx = 0;
+        while (true) {
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
 
-    private LoadMapping buildLocalTasksUnevenLoadMapping(List<Integer> availableTasks) {
-        LoadMapping loadMapping = new LoadMapping();
-        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
-        for (int i = 0; i < availableTasks.size(); i++) {
-            localLoadMap.put(availableTasks.get(i), 0.1 * (i + 1));
+            idx++;
+            if (idx % 100000 == 0) {
+                // warm up 60 seconds
+                if (System.currentTimeMillis() - current >= 60_000) {
+                    break;
+                }
+            }
         }
-        loadMapping.setLocal(localLoadMap);
-        return loadMapping;
-    }
 
-    private LoadMapping buildLocalTasksRandomLoadMapping(List<Integer> availableTasks) {
-        LoadMapping loadMapping = new LoadMapping();
-        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
-        for (int i = 0; i < availableTasks.size(); i++) {
-            localLoadMap.put(availableTasks.get(i), Math.random());
+        current = System.currentTimeMillis();
+        for (int i = 1; i <= 2_000_000_000 ; i++) {
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
         }
-        loadMapping.setLocal(localLoadMap);
-        return loadMapping;
+
+        LOG.info("Duration: {} ms", (System.currentTimeMillis() - current));
     }
 
-    private void runSimpleBenchmark(LoadAwareCustomStreamGrouping grouper,
-        List<Integer> availableTaskIds, LoadMapping loadMapping) {
+    private void runMultithreadedBenchmark(LoadAwareCustomStreamGrouping grouper,
+        List<Integer> availableTaskIds, LoadMapping loadMapping, int numThreads)
+        throws InterruptedException, ExecutionException {
         // Task Id not used, so just pick a static value
         final int inputTaskId = 100;
 
-        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+        final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+
+        // Call prepare with our available taskIds
         grouper.prepare(context, null, availableTaskIds);
 
-        // triggers building distribution ring
+        // triggers building ring
         grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
 
         long current = System.currentTimeMillis();
@@ -389,11 +448,40 @@ public class LoadAwareShuffleGroupingTest {
             }
         }
 
-        current = System.currentTimeMillis();
-        for (int i = 1; i <= 2_000_000_000 ; i++) {
-            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+        final int groupingExecutionsPerThread = 2_000_000_000;
+
+        List<Callable<Long>> threadTasks = Lists.newArrayList();
+        for (int x = 0; x < numThreads; x++) {
+            Callable<Long> threadTask = new Callable<Long>() {
+                @Override
+                public Long call() throws Exception {
+                    long current = System.currentTimeMillis();
+                    for (int i = 1; i <= groupingExecutionsPerThread; i++) {
+                        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+                    }
+                    return System.currentTimeMillis() - current;
+                }
+            };
+
+            // Add to our collection.
+            threadTasks.add(threadTask);
+        }
+
+        ExecutorService executor = Executors.newFixedThreadPool(threadTasks.size());
+        List<Future<Long>> taskResults = executor.invokeAll(threadTasks);
+
+        // Wait for all tasks to complete
+        Long maxDurationMillis = 0L;
+        for (Future taskResult: taskResults) {
+            while (!taskResult.isDone()) {
+                Thread.sleep(100);
+            }
+            Long durationMillis = (Long) taskResult.get();
+            if (maxDurationMillis < durationMillis) {
+                maxDurationMillis = durationMillis;
+            }
         }
 
-        System.out.println("Duration: " + (System.currentTimeMillis() - current) + " ms");
+        LOG.info("Max duration among threads is : {} ms", maxDurationMillis);
     }
 }
\ No newline at end of file