You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 18:09:19 UTC

[1/8] storm git commit: STORM-2678 Improve performance of LoadAwareShuffleGrouping

Repository: storm
Updated Branches:
  refs/heads/master 9bf202277 -> dad14e414


STORM-2678 Improve performance of LoadAwareShuffleGrouping

* construct ring which represents distribution of tasks based on load
* chooseTasks() just accesses the ring sequentially
* port related tests from Clojure to Java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/48427880
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/48427880
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/48427880

Branch: refs/heads/master
Commit: 48427880a483e1d86f9dbe99ef46e395e3a40308
Parents: 77354fe
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Aug 4 13:25:27 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Aug 4 15:17:24 2017 +0900

----------------------------------------------------------------------
 .../grouping/LoadAwareShuffleGrouping.java      | 104 ++++-
 .../grouping/LoadAwareShuffleGroupingTest.java  | 399 +++++++++++++++++++
 .../test/clj/org/apache/storm/grouping_test.clj |  36 --
 3 files changed, 485 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/48427880/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 9a07194..b460a8a 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -20,30 +20,51 @@ package org.apache.storm.grouping;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.WorkerTopologyContext;
 
 public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
+    private static final int CAPACITY_TASK_MULTIPLICATION = 100;
+
     private Random random;
     private List<Integer>[] rets;
     private int[] targets;
-    private int[] loads;
-    private int total;
+    private ArrayList<List<Integer>> choices;
+    private AtomicInteger current;
+    private int actualCapacity = 0;
+
     private long lastUpdate = 0;
+    private AtomicBoolean isUpdating = new AtomicBoolean(false);
 
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
         random = new Random();
+
         rets = (List<Integer>[])new List<?>[targetTasks.size()];
         targets = new int[targetTasks.size()];
         for (int i = 0; i < targets.length; i++) {
             rets[i] = Arrays.asList(targetTasks.get(i));
             targets[i] = targetTasks.get(i);
         }
-        loads = new int[targets.length];
+
+        actualCapacity = targets.length * CAPACITY_TASK_MULTIPLICATION;
+
+        // can't leave choices to be empty, so initiate it similar as ShuffleGrouping
+        choices = new ArrayList<>(actualCapacity);
+        int index = 0;
+        while (index < actualCapacity) {
+            choices.add(rets[index % targets.length]);
+            index++;
+        }
+
+        Collections.shuffle(choices, random);
+        current = new AtomicInteger(0);
     }
 
     @Override
@@ -53,24 +74,71 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
 
     @Override
     public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
-        if ((lastUpdate + 1000) < System.currentTimeMillis()) {
-            int local_total = 0;
-            for (int i = 0; i < targets.length; i++) {
-                int val = (int)(101 - (load.get(targets[i]) * 100));
-                loads[i] = val;
-                local_total += val;
-            }
-            total = local_total;
+        if ((lastUpdate + 1000) < System.currentTimeMillis()
+            && isUpdating.compareAndSet(false, true)) {
+            // update time earlier to reduce chance to do CAS
+            // concurrent call can still rely on old choices
             lastUpdate = System.currentTimeMillis();
+            updateRing(load);
+            isUpdating.set(false);
         }
-        int selected = random.nextInt(total);
-        int sum = 0;
-        for (int i = 0; i < targets.length; i++) {
-            sum += loads[i];
-            if (selected < sum) {
-                return rets[i];
+
+        int rightNow;
+        int size = choices.size();
+        while (true) {
+            rightNow = current.incrementAndGet();
+            if (rightNow < size) {
+                return choices.get(rightNow);
+            } else if (rightNow == size) {
+                current.set(0);
+                return choices.get(0);
             }
+            //race condition with another thread, and we lost
+            // try again
         }
-        return rets[rets.length-1];
+    }
+
+    private void updateRing(LoadMapping load) {
+        int localTotal = 0;
+        int[] loads = new int[targets.length];
+        for (int i = 0 ; i < targets.length; i++) {
+            int val = (int)(101 - (load.get(targets[i]) * 100));
+            loads[i] = val;
+            localTotal += val;
+        }
+
+        // allocating enough memory doesn't hurt much, so assign aggressively
+        // we will cut out if actual size becomes bigger than actualCapacity
+        ArrayList<List<Integer>> newChoices = new ArrayList<>(actualCapacity + targets.length);
+        for (int i = 0 ; i < loads.length ; i++) {
+            int loadForTask = loads[i];
+            int amount = loadForTask * actualCapacity / localTotal;
+            // assign at least one for task
+            if (amount == 0) {
+                amount = 1;
+            }
+            for (int j = 0; j < amount; j++) {
+                newChoices.add(rets[i]);
+            }
+        }
+
+        Collections.shuffle(newChoices, random);
+
+        // make sure length of newChoices is same as actualCapacity, like current choices
+        // this ensures safety when requests and update occurs concurrently
+        if (newChoices.size() > actualCapacity) {
+            newChoices.subList(actualCapacity, newChoices.size()).clear();
+        } else if (newChoices.size() < actualCapacity) {
+            int remaining = actualCapacity - newChoices.size();
+            while (remaining > 0) {
+                newChoices.add(newChoices.get(remaining % newChoices.size()));
+                remaining--;
+            }
+        }
+
+        assert newChoices.size() == actualCapacity;
+
+        choices = newChoices;
+        current.set(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/48427880/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
new file mode 100644
index 0000000..410c1f9
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.grouping;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.NullStruct;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class LoadAwareShuffleGroupingTest {
+    private static final Logger LOG = LoggerFactory.getLogger(LoadAwareShuffleGroupingTest.class);
+
+    @Test
+    public void testLoadAwareShuffleGroupingWithEvenLoad() {
+        // just pick arbitrary number
+        final int numTasks = 7;
+        final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
+
+        // Define our taskIds and loads
+        final List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+        final LoadMapping loadMapping = buildLocalTasksEvenLoadMapping(availableTaskIds);
+
+        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+        grouper.prepare(context, null, availableTaskIds);
+
+        // Keep track of how many times we see each taskId
+        // distribution should be even for all nodes when loads are even
+        int desiredTaskCountPerTask = 5000;
+        int totalEmits = numTasks * desiredTaskCountPerTask;
+
+        int[] taskCounts = runChooseTasksWithVerification(grouper, totalEmits, numTasks, loadMapping);
+
+        for (int i = 0; i < numTasks; i++) {
+            assertEquals("Distribution should be even for all nodes", desiredTaskCountPerTask,
+                taskCounts[i]);
+        }
+    }
+
+    @Test
+    public void testLoadAwareShuffleGroupingWithEvenLoadMultiThreaded() throws InterruptedException, ExecutionException {
+        final int numTasks = 7;
+
+        final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
+
+        // Task Id not used, so just pick a static value
+        final int inputTaskId = 100;
+        // Define our taskIds - the test expects these to be incrementing by one up from zero
+        final List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+        final LoadMapping loadMapping = buildLocalTasksEvenLoadMapping(availableTaskIds);
+
+        final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+
+        // Call prepare with our available taskIds
+        grouper.prepare(context, null, availableTaskIds);
+
+        // triggers building ring
+        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+
+        // calling chooseTasks should be finished before refreshing ring
+        // adjusting groupingExecutionsPerThread might be needed with really slow machine
+        // we allow race condition between refreshing ring and choosing tasks
+        // so it will not make exact even distribution, though diff is expected to be small
+        final int groupingExecutionsPerThread = numTasks * 5000;
+        final int numThreads = 10;
+
+        List<Callable<int[]>> threadTasks = Lists.newArrayList();
+        for (int x = 0; x < numThreads; x++) {
+            Callable<int[]> threadTask = new Callable<int[]>() {
+                @Override
+                public int[] call() throws Exception {
+                    int[] taskCounts = new int[availableTaskIds.size()];
+                    for (int i = 1; i <= groupingExecutionsPerThread; i++) {
+                        List<Integer> taskIds = grouper.chooseTasks(inputTaskId,
+                            Lists.newArrayList(), loadMapping);
+
+                        // Validate a single task id return
+                        assertNotNull("Not null taskId list returned", taskIds);
+                        assertEquals("Single task Id returned", 1, taskIds.size());
+
+                        int taskId = taskIds.get(0);
+
+                        assertTrue("TaskId should exist", taskId >= 0 && taskId < availableTaskIds.size());
+                        taskCounts[taskId]++;
+                    }
+                    return taskCounts;
+                }
+            };
+
+            // Add to our collection.
+            threadTasks.add(threadTask);
+        }
+
+        ExecutorService executor = Executors.newFixedThreadPool(threadTasks.size());
+        List<Future<int[]>> taskResults = executor.invokeAll(threadTasks);
+
+        // Wait for all tasks to complete
+        int[] taskIdTotals = new int[numTasks];
+        for (Future taskResult: taskResults) {
+            while (!taskResult.isDone()) {
+                Thread.sleep(1000);
+            }
+            int[] taskDistributions = (int[]) taskResult.get();
+            for (int i = 0; i < taskDistributions.length; i++) {
+                taskIdTotals[i] += taskDistributions[i];
+            }
+        }
+
+        for (int i = 0; i < numTasks; i++) {
+            int expected = numThreads * groupingExecutionsPerThread / numTasks;
+            assertEquals("Distribution should be even for all nodes", expected, taskIdTotals[i]);
+        }
+    }
+
+    @Test
+    public void testLoadAwareShuffleGroupingWithUnevenLoad() {
+        // just pick arbitrary number
+        final int numTasks = 7;
+        final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
+
+        // Define our taskIds and loads
+        final List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+        final LoadMapping loadMapping = buildLocalTasksUnevenLoadMapping(availableTaskIds);
+
+        runDistributionVerificationTestWithUnevenLoad(numTasks, grouper, availableTaskIds,
+            loadMapping);
+    }
+
+    @Test
+    public void testLoadAwareShuffleGroupingWithRandomTasksAndRandomLoad() {
+        for (int trial = 0 ; trial < 100 ; trial++) {
+            // just pick arbitrary number in 5 ~ 100
+            final int numTasks = new Random().nextInt(96) + 5;
+            final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
+
+            // Define our taskIds and loads
+            final List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+            final LoadMapping loadMapping = buildLocalTasksRandomLoadMapping(availableTaskIds);
+
+            runDistributionVerificationTestWithUnevenLoad(numTasks, grouper, availableTaskIds,
+                loadMapping);
+        }
+    }
+
+    @Test
+    public void testShuffleLoadEven() {
+        // port test-shuffle-load-even
+        LoadAwareCustomStreamGrouping shuffler = GrouperFactory
+            .mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
+                Lists.newArrayList(1, 2), Collections.emptyMap());
+        int numMessages = 100000;
+        int minPrCount = (int) (numMessages * 0.49);
+        int maxPrCount = (int) (numMessages * 0.51);
+        LoadMapping load = new LoadMapping();
+        Map<Integer, Double> loadInfoMap = new HashMap<>();
+        loadInfoMap.put(1, 0.0);
+        loadInfoMap.put(2, 0.0);
+        load.setLocal(loadInfoMap);
+
+        List<Object> data = Lists.newArrayList(1, 2);
+        int[] frequencies = new int[3];
+        for (int i = 0 ; i < numMessages ; i++) {
+            List<Integer> tasks = shuffler.chooseTasks(1, data, load);
+            for (int task : tasks) {
+                frequencies[task]++;
+            }
+        }
+
+        int load1 = frequencies[1];
+        int load2 = frequencies[2];
+
+        LOG.info("Frequency info: load1 = {}, load2 = {}", load1, load2);
+
+        assertTrue(load1 >= minPrCount);
+        assertTrue(load1 <= maxPrCount);
+        assertTrue(load2 >= minPrCount);
+        assertTrue(load2 <= maxPrCount);
+    }
+
+    @Test
+    public void testShuffleLoadUneven() {
+        // port test-shuffle-load-uneven
+        LoadAwareCustomStreamGrouping shuffler = GrouperFactory
+            .mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
+                Lists.newArrayList(1, 2), Collections.emptyMap());
+        int numMessages = 100000;
+        int min1PrCount = (int) (numMessages * 0.32);
+        int max1PrCount = (int) (numMessages * 0.34);
+        int min2PrCount = (int) (numMessages * 0.65);
+        int max2PrCount = (int) (numMessages * 0.67);
+        LoadMapping load = new LoadMapping();
+        Map<Integer, Double> loadInfoMap = new HashMap<>();
+        loadInfoMap.put(1, 0.5);
+        loadInfoMap.put(2, 0.0);
+        load.setLocal(loadInfoMap);
+
+        List<Object> data = Lists.newArrayList(1, 2);
+        int[] frequencies = new int[3]; // task id starts from 1
+        for (int i = 0 ; i < numMessages ; i++) {
+            List<Integer> tasks = shuffler.chooseTasks(1, data, load);
+            for (int task : tasks) {
+                frequencies[task]++;
+            }
+        }
+
+        int load1 = frequencies[1];
+        int load2 = frequencies[2];
+
+        LOG.info("Frequency info: load1 = {}, load2 = {}", load1, load2);
+
+        assertTrue(load1 >= min1PrCount);
+        assertTrue(load1 <= max1PrCount);
+        assertTrue(load2 >= min2PrCount);
+        assertTrue(load2 <= max2PrCount);
+    }
+
+    private int[] runChooseTasksWithVerification(LoadAwareShuffleGrouping grouper, int totalEmits,
+        int numTasks, LoadMapping loadMapping) {
+        int[] taskCounts = new int[numTasks];
+
+        // Task Id not used, so just pick a static value
+        int inputTaskId = 100;
+
+        // triggers building ring
+        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+
+        for (int i = 1; i <= totalEmits; i++) {
+            List<Integer> taskIds = grouper
+                .chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+
+            // Validate a single task id return
+            assertNotNull("Not null taskId list returned", taskIds);
+            assertEquals("Single task Id returned", 1, taskIds.size());
+
+            int taskId = taskIds.get(0);
+
+            assertTrue("TaskId should exist", taskId >= 0 && taskId < numTasks);
+            taskCounts[taskId]++;
+        }
+        return taskCounts;
+    }
+
+    private void runDistributionVerificationTestWithUnevenLoad(int numTasks,
+        LoadAwareShuffleGrouping grouper, List<Integer> availableTaskIds,
+        LoadMapping loadMapping) {
+        int[] loads = new int[numTasks];
+        int localTotal = 0;
+        List<Double> loadRate = new ArrayList<>();
+        for (int i = 0; i < numTasks; i++) {
+            int val = (int)(101 - (loadMapping.get(i) * 100));
+            loads[i] = val;
+            localTotal += val;
+        }
+
+        for (int i = 0; i < numTasks; i++) {
+            loadRate.add(loads[i] * 1.0 / localTotal);
+        }
+
+        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+        grouper.prepare(context, null, availableTaskIds);
+
+        // Keep track of how many times we see each taskId
+        int totalEmits = 5000 * numTasks;
+        int[] taskCounts = runChooseTasksWithVerification(grouper, totalEmits, numTasks, loadMapping);
+
+        int delta = (int) (totalEmits * 0.01);
+        for (int i = 0; i < numTasks; i++) {
+            int expected = (int) (totalEmits * loadRate.get(i));
+            assertTrue("Distribution should respect the task load with small delta",
+                taskCounts[i] >= expected - delta && taskCounts[i] <= expected + delta);
+        }
+    }
+
+    @Ignore
+    @Test
+    public void testBenchmarkLoadAwareShuffleGroupingEvenLoad() {
+        final int numTasks = 10;
+        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+        runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+            buildLocalTasksEvenLoadMapping(availableTaskIds));
+    }
+
+    @Ignore
+    @Test
+    public void testBenchmarkLoadAwareShuffleGroupingUnevenLoad() {
+        final int numTasks = 10;
+        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+        runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+            buildLocalTasksUnevenLoadMapping(availableTaskIds));
+    }
+
+    private List<Integer> getAvailableTaskIds(int numTasks) {
+        // this method should return sequential numbers starting at 0
+        final List<Integer> availableTaskIds = Lists.newArrayList();
+        for (int i = 0; i < numTasks; i++) {
+            availableTaskIds.add(i);
+        }
+        return availableTaskIds;
+    }
+
+    private LoadMapping buildLocalTasksEvenLoadMapping(List<Integer> availableTasks) {
+        LoadMapping loadMapping = new LoadMapping();
+        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+        for (int i = 0; i < availableTasks.size(); i++) {
+            localLoadMap.put(availableTasks.get(i), 0.1);
+        }
+        loadMapping.setLocal(localLoadMap);
+        return loadMapping;
+    }
+
+    private LoadMapping buildLocalTasksUnevenLoadMapping(List<Integer> availableTasks) {
+        LoadMapping loadMapping = new LoadMapping();
+        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+        for (int i = 0; i < availableTasks.size(); i++) {
+            localLoadMap.put(availableTasks.get(i), 0.1 * (i + 1));
+        }
+        loadMapping.setLocal(localLoadMap);
+        return loadMapping;
+    }
+
+    private LoadMapping buildLocalTasksRandomLoadMapping(List<Integer> availableTasks) {
+        LoadMapping loadMapping = new LoadMapping();
+        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+        for (int i = 0; i < availableTasks.size(); i++) {
+            localLoadMap.put(availableTasks.get(i), Math.random());
+        }
+        loadMapping.setLocal(localLoadMap);
+        return loadMapping;
+    }
+
+    private void runSimpleBenchmark(LoadAwareCustomStreamGrouping grouper,
+        List<Integer> availableTaskIds, LoadMapping loadMapping) {
+        // Task Id not used, so just pick a static value
+        final int inputTaskId = 100;
+
+        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+        grouper.prepare(context, null, availableTaskIds);
+
+        // triggers building distribution ring
+        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+
+        long current = System.currentTimeMillis();
+        int idx = 0;
+        while (true) {
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+
+            idx++;
+            if (idx % 100000 == 0) {
+                // warm up 60 seconds
+                if (System.currentTimeMillis() - current >= 60_000) {
+                    break;
+                }
+            }
+        }
+
+        current = System.currentTimeMillis();
+        for (int i = 1; i <= 2_000_000_000 ; i++) {
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+        }
+
+        System.out.println("Duration: " + (System.currentTimeMillis() - current) + " ms");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/48427880/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index 9bfb8a7..68ebb4e 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -41,42 +41,6 @@
     (is (>= load2 min-prcnt))
     (is (<= load2 max-prcnt))))
 
-(deftest test-shuffle-load-even
- (let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {})
-       num-messages 100000
-       min-prcnt (int (* num-messages 0.49))
-       max-prcnt (int (* num-messages 0.51))
-       load (LoadMapping.)
-       _ (.setLocal load {(int 1) 0.0 (int 2) 0.0})
-       data [1 2]
-       freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data load)))
-       load1 (.get freq [(int 1)])
-       load2 (.get freq [(int 2)])]
-    (log-message "FREQ:" freq)
-    (is (>= load1 min-prcnt))
-    (is (<= load1 max-prcnt))
-    (is (>= load2 min-prcnt))
-    (is (<= load2 max-prcnt))))
-
-(deftest test-shuffle-load-uneven
- (let [shuffler (GrouperFactory/mkGrouper nil "comp" "stream" nil shuffle-grouping [(int 1) (int 2)] {})
-       num-messages 100000
-       min1-prcnt (int (* num-messages 0.32))
-       max1-prcnt (int (* num-messages 0.34))
-       min2-prcnt (int (* num-messages 0.65))
-       max2-prcnt (int (* num-messages 0.67))
-       load (LoadMapping.)
-       _ (.setLocal load {(int 1) 0.5 (int 2) 0.0})
-       data [1 2]
-       freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data load)))
-       load1 (.get freq [(int 1)])
-       load2 (.get freq [(int 2)])]
-    (log-message "FREQ:" freq)
-    (is (>= load1 min1-prcnt))
-    (is (<= load1 max1-prcnt))
-    (is (>= load2 min2-prcnt))
-    (is (<= load2 max2-prcnt))))
-
 (deftest test-field
   (with-open [cluster (.build (doto (LocalCluster$Builder.)
                                 (.withSimulatedTime)


[8/8] storm git commit: Merge branch 'STORM-2678' of https://github.com/HeartSaVioR/storm into STORM-2678

Posted by bo...@apache.org.
Merge branch 'STORM-2678' of https://github.com/HeartSaVioR/storm into STORM-2678

STORM-2678: Improve performance of LoadAwareShuffleGrouping

This closes #2261


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dad14e41
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dad14e41
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dad14e41

Branch: refs/heads/master
Commit: dad14e414619b1f336d18bc570ea165e072008ea
Parents: 9bf2022 4dade36
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Thu Sep 7 12:32:26 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Thu Sep 7 12:32:26 2017 -0500

----------------------------------------------------------------------
 .../org/apache/storm/daemon/GrouperFactory.java |   8 +-
 .../src/jvm/org/apache/storm/daemon/Task.java   |   2 +-
 .../org/apache/storm/daemon/worker/Worker.java  |  11 +-
 .../jvm/org/apache/storm/executor/Executor.java |  19 +
 .../apache/storm/executor/ExecutorShutdown.java |   6 +
 .../apache/storm/executor/IRunningExecutor.java |   2 +
 .../grouping/LoadAwareCustomStreamGrouping.java |   4 +-
 .../grouping/LoadAwareShuffleGrouping.java      | 126 ++++-
 .../grouping/LoadAwareShuffleGroupingTest.java  | 513 +++++++++++++++++++
 .../test/clj/org/apache/storm/grouping_test.clj |  38 +-
 10 files changed, 664 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dad14e41/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/dad14e41/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------


[3/8] storm git commit: STORM-2678 Improve performance of LoadAwareShuffleGrouping

Posted by bo...@apache.org.
STORM-2678 Improve performance of LoadAwareShuffleGrouping

* introduce 'skip checking update count'
  * we no longer call System.currentTimeMillis() every time
  * but we call AtomicInteger.incrementAndGet() every time
  * this may hurt multi-thread perf. but really faster with single-thread


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8f63d5a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8f63d5a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8f63d5a8

Branch: refs/heads/master
Commit: 8f63d5a85609ca71019ab298859af0b9858262b8
Parents: cab86d0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Aug 4 17:48:32 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Aug 4 18:26:21 2017 +0900

----------------------------------------------------------------------
 .../grouping/LoadAwareShuffleGrouping.java      | 28 ++++++++---
 .../grouping/LoadAwareShuffleGroupingTest.java  | 52 ++++++++++++--------
 2 files changed, 52 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8f63d5a8/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index b460a8a..84e4612 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -26,12 +26,16 @@ import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.WorkerTopologyContext;
 
 public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
     private static final int CAPACITY_TASK_MULTIPLICATION = 100;
 
+    @VisibleForTesting
+    static final int CHECK_UPDATE_INDEX = 100;
+
     private Random random;
     private List<Integer>[] rets;
     private int[] targets;
@@ -39,8 +43,9 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
     private AtomicInteger current;
     private int actualCapacity = 0;
 
+    private AtomicInteger skipCheckingUpdateCount;
+    private AtomicBoolean isUpdating;
     private long lastUpdate = 0;
-    private AtomicBoolean isUpdating = new AtomicBoolean(false);
 
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
@@ -65,6 +70,8 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
 
         Collections.shuffle(choices, random);
         current = new AtomicInteger(0);
+        skipCheckingUpdateCount = new AtomicInteger(0);
+        isUpdating = new AtomicBoolean(false);
     }
 
     @Override
@@ -74,13 +81,18 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
 
     @Override
     public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
-        if ((lastUpdate + 1000) < System.currentTimeMillis()
-            && isUpdating.compareAndSet(false, true)) {
-            // update time earlier to reduce chance to do CAS
-            // concurrent call can still rely on old choices
-            lastUpdate = System.currentTimeMillis();
-            updateRing(load);
-            isUpdating.set(false);
+        if (skipCheckingUpdateCount.incrementAndGet() == CHECK_UPDATE_INDEX) {
+            skipCheckingUpdateCount.set(0);
+            if ((lastUpdate + 1000) < System.currentTimeMillis()
+                && isUpdating.compareAndSet(false, true)) {
+                // before finishing updateRing(), concurrent call will still rely on old choices
+                updateRing(load);
+
+                // update time and open a chance to update ring again
+                lastUpdate = System.currentTimeMillis();
+                skipCheckingUpdateCount.set(0);
+                isUpdating.set(false);
+            }
         }
 
         int rightNow;

http://git-wip-us.apache.org/repos/asf/storm/blob/8f63d5a8/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index bf2269f..667f36d 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -45,6 +45,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 public class LoadAwareShuffleGroupingTest {
+    public static final double ACCEPTABLE_MARGIN = 0.015;
     private static final Logger LOG = LoggerFactory.getLogger(LoadAwareShuffleGroupingTest.class);
 
     @Test
@@ -86,17 +87,19 @@ public class LoadAwareShuffleGroupingTest {
         final LoadMapping loadMapping = buildLocalTasksEvenLoadMapping(availableTaskIds);
 
         final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
-
-        // Call prepare with our available taskIds
         grouper.prepare(context, null, availableTaskIds);
 
-        // triggers building ring
-        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+        // force triggers building ring
+        for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+        }
 
         // calling chooseTasks should be finished before refreshing ring
         // adjusting groupingExecutionsPerThread might be needed with really slow machine
         // we allow race condition between refreshing ring and choosing tasks
         // so it will not make exact even distribution, though diff is expected to be small
+        // given that all threadTasks are finished before refreshing ring,
+        // distribution should be exactly even
         final int groupingExecutionsPerThread = numTasks * 5000;
         final int numThreads = 10;
 
@@ -142,6 +145,19 @@ public class LoadAwareShuffleGroupingTest {
             }
         }
 
+        int[] loads = new int[numTasks];
+        int localTotal = 0;
+        List<Double> loadRate = new ArrayList<>();
+        for (int i = 0; i < numTasks; i++) {
+            int val = (int)(101 - (loadMapping.get(i) * 100));
+            loads[i] = val;
+            localTotal += val;
+        }
+
+        for (int i = 0; i < numTasks; i++) {
+            loadRate.add(loads[i] * 1.0 / localTotal);
+        }
+
         for (int i = 0; i < numTasks; i++) {
             int expected = numThreads * groupingExecutionsPerThread / numTasks;
             assertEquals("Distribution should be even for all nodes", expected, taskIdTotals[i]);
@@ -164,7 +180,7 @@ public class LoadAwareShuffleGroupingTest {
 
     @Test
     public void testLoadAwareShuffleGroupingWithRandomTasksAndRandomLoad() {
-        for (int trial = 0 ; trial < 100 ; trial++) {
+        for (int trial = 0 ; trial < 200 ; trial++) {
             // just pick arbitrary number in 5 ~ 100
             final int numTasks = new Random().nextInt(96) + 5;
             final LoadAwareShuffleGrouping grouper = new LoadAwareShuffleGrouping();
@@ -185,8 +201,8 @@ public class LoadAwareShuffleGroupingTest {
             .mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
                 Lists.newArrayList(1, 2), Collections.emptyMap());
         int numMessages = 100000;
-        int minPrCount = (int) (numMessages * 0.49);
-        int maxPrCount = (int) (numMessages * 0.51);
+        int minPrCount = (int) (numMessages * (0.5 - ACCEPTABLE_MARGIN));
+        int maxPrCount = (int) (numMessages * (0.5 + ACCEPTABLE_MARGIN));
         LoadMapping load = new LoadMapping();
         Map<Integer, Double> loadInfoMap = new HashMap<>();
         loadInfoMap.put(1, 0.0);
@@ -220,10 +236,10 @@ public class LoadAwareShuffleGroupingTest {
             .mkGrouper(null, "comp", "stream", null, Grouping.shuffle(new NullStruct()),
                 Lists.newArrayList(1, 2), Collections.emptyMap());
         int numMessages = 100000;
-        int min1PrCount = (int) (numMessages * 0.32);
-        int max1PrCount = (int) (numMessages * 0.34);
-        int min2PrCount = (int) (numMessages * 0.65);
-        int max2PrCount = (int) (numMessages * 0.67);
+        int min1PrCount = (int) (numMessages * (0.33 - ACCEPTABLE_MARGIN));
+        int max1PrCount = (int) (numMessages * (0.33 + ACCEPTABLE_MARGIN));
+        int min2PrCount = (int) (numMessages * (0.66 - ACCEPTABLE_MARGIN));
+        int max2PrCount = (int) (numMessages * (0.66 + ACCEPTABLE_MARGIN));
         LoadMapping load = new LoadMapping();
         Map<Integer, Double> loadInfoMap = new HashMap<>();
         loadInfoMap.put(1, 0.5);
@@ -337,8 +353,10 @@ public class LoadAwareShuffleGroupingTest {
         // Task Id not used, so just pick a static value
         int inputTaskId = 100;
 
-        // triggers building ring
-        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+        // force triggers building ring
+        for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+        }
 
         for (int i = 1; i <= totalEmits; i++) {
             List<Integer> taskIds = grouper
@@ -379,7 +397,7 @@ public class LoadAwareShuffleGroupingTest {
         int totalEmits = 5000 * numTasks;
         int[] taskCounts = runChooseTasksWithVerification(grouper, totalEmits, numTasks, loadMapping);
 
-        int delta = (int) (totalEmits * 0.01);
+        int delta = (int) (totalEmits * ACCEPTABLE_MARGIN);
         for (int i = 0; i < numTasks; i++) {
             int expected = (int) (totalEmits * loadRate.get(i));
             assertTrue("Distribution should respect the task load with small delta",
@@ -395,9 +413,6 @@ public class LoadAwareShuffleGroupingTest {
         WorkerTopologyContext context = mock(WorkerTopologyContext.class);
         grouper.prepare(context, null, availableTaskIds);
 
-        // triggers building distribution ring
-        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
-
         long current = System.currentTimeMillis();
         int idx = 0;
         while (true) {
@@ -431,9 +446,6 @@ public class LoadAwareShuffleGroupingTest {
         // Call prepare with our available taskIds
         grouper.prepare(context, null, availableTaskIds);
 
-        // triggers building ring
-        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
-
         long current = System.currentTimeMillis();
         int idx = 0;
         while (true) {


[2/8] storm git commit: STORM-2678 Improve performance of LoadAwareShuffleGrouping

Posted by bo...@apache.org.
STORM-2678 Improve performance of LoadAwareShuffleGrouping

* add performance tests for multi-threads


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cab86d08
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cab86d08
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cab86d08

Branch: refs/heads/master
Commit: cab86d08f6f49afaca5b49d6ed08079f700b3737
Parents: 4842788
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Aug 4 16:20:55 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Aug 4 16:20:55 2017 +0900

----------------------------------------------------------------------
 .../grouping/LoadAwareShuffleGroupingTest.java  | 200 +++++++++++++------
 1 file changed, 144 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cab86d08/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 410c1f9..bf2269f 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -250,6 +250,86 @@ public class LoadAwareShuffleGroupingTest {
         assertTrue(load2 <= max2PrCount);
     }
 
+    @Ignore
+    @Test
+    public void testBenchmarkLoadAwareShuffleGroupingEvenLoad() {
+        final int numTasks = 10;
+        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+        runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+            buildLocalTasksEvenLoadMapping(availableTaskIds));
+    }
+
+    @Ignore
+    @Test
+    public void testBenchmarkLoadAwareShuffleGroupingUnevenLoad() {
+        final int numTasks = 10;
+        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+        runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+            buildLocalTasksUnevenLoadMapping(availableTaskIds));
+    }
+
+    @Ignore
+    @Test
+    public void testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded()
+        throws ExecutionException, InterruptedException {
+        final int numTasks = 10;
+        final int numThreads = 2;
+        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+        runMultithreadedBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+            buildLocalTasksEvenLoadMapping(availableTaskIds), numThreads);
+    }
+
+    @Ignore
+    @Test
+    public void testBenchmarkLoadAwareShuffleGroupingUnevenLoadAndMultiThreaded()
+        throws ExecutionException, InterruptedException {
+        final int numTasks = 10;
+        final int numThreads = 2;
+        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
+        runMultithreadedBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
+            buildLocalTasksUnevenLoadMapping(availableTaskIds), numThreads);
+    }
+
+    private List<Integer> getAvailableTaskIds(int numTasks) {
+        // this method should return sequential numbers starting at 0
+        final List<Integer> availableTaskIds = Lists.newArrayList();
+        for (int i = 0; i < numTasks; i++) {
+            availableTaskIds.add(i);
+        }
+        return availableTaskIds;
+    }
+
+    private LoadMapping buildLocalTasksEvenLoadMapping(List<Integer> availableTasks) {
+        LoadMapping loadMapping = new LoadMapping();
+        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+        for (int i = 0; i < availableTasks.size(); i++) {
+            localLoadMap.put(availableTasks.get(i), 0.1);
+        }
+        loadMapping.setLocal(localLoadMap);
+        return loadMapping;
+    }
+
+    private LoadMapping buildLocalTasksUnevenLoadMapping(List<Integer> availableTasks) {
+        LoadMapping loadMapping = new LoadMapping();
+        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+        for (int i = 0; i < availableTasks.size(); i++) {
+            localLoadMap.put(availableTasks.get(i), 0.1 * (i + 1));
+        }
+        loadMapping.setLocal(localLoadMap);
+        return loadMapping;
+    }
+
+    private LoadMapping buildLocalTasksRandomLoadMapping(List<Integer> availableTasks) {
+        LoadMapping loadMapping = new LoadMapping();
+        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
+        for (int i = 0; i < availableTasks.size(); i++) {
+            localLoadMap.put(availableTasks.get(i), Math.random());
+        }
+        loadMapping.setLocal(localLoadMap);
+        return loadMapping;
+    }
+
+
     private int[] runChooseTasksWithVerification(LoadAwareShuffleGrouping grouper, int totalEmits,
         int numTasks, LoadMapping loadMapping) {
         int[] taskCounts = new int[numTasks];
@@ -307,72 +387,51 @@ public class LoadAwareShuffleGroupingTest {
         }
     }
 
-    @Ignore
-    @Test
-    public void testBenchmarkLoadAwareShuffleGroupingEvenLoad() {
-        final int numTasks = 10;
-        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
-        runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
-            buildLocalTasksEvenLoadMapping(availableTaskIds));
-    }
+    private void runSimpleBenchmark(LoadAwareCustomStreamGrouping grouper,
+        List<Integer> availableTaskIds, LoadMapping loadMapping) {
+        // Task Id not used, so just pick a static value
+        final int inputTaskId = 100;
 
-    @Ignore
-    @Test
-    public void testBenchmarkLoadAwareShuffleGroupingUnevenLoad() {
-        final int numTasks = 10;
-        List<Integer> availableTaskIds = getAvailableTaskIds(numTasks);
-        runSimpleBenchmark(new LoadAwareShuffleGrouping(), availableTaskIds,
-            buildLocalTasksUnevenLoadMapping(availableTaskIds));
-    }
+        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+        grouper.prepare(context, null, availableTaskIds);
 
-    private List<Integer> getAvailableTaskIds(int numTasks) {
-        // this method should return sequential numbers starting at 0
-        final List<Integer> availableTaskIds = Lists.newArrayList();
-        for (int i = 0; i < numTasks; i++) {
-            availableTaskIds.add(i);
-        }
-        return availableTaskIds;
-    }
+        // triggers building distribution ring
+        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
 
-    private LoadMapping buildLocalTasksEvenLoadMapping(List<Integer> availableTasks) {
-        LoadMapping loadMapping = new LoadMapping();
-        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
-        for (int i = 0; i < availableTasks.size(); i++) {
-            localLoadMap.put(availableTasks.get(i), 0.1);
-        }
-        loadMapping.setLocal(localLoadMap);
-        return loadMapping;
-    }
+        long current = System.currentTimeMillis();
+        int idx = 0;
+        while (true) {
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
 
-    private LoadMapping buildLocalTasksUnevenLoadMapping(List<Integer> availableTasks) {
-        LoadMapping loadMapping = new LoadMapping();
-        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
-        for (int i = 0; i < availableTasks.size(); i++) {
-            localLoadMap.put(availableTasks.get(i), 0.1 * (i + 1));
+            idx++;
+            if (idx % 100000 == 0) {
+                // warm up 60 seconds
+                if (System.currentTimeMillis() - current >= 60_000) {
+                    break;
+                }
+            }
         }
-        loadMapping.setLocal(localLoadMap);
-        return loadMapping;
-    }
 
-    private LoadMapping buildLocalTasksRandomLoadMapping(List<Integer> availableTasks) {
-        LoadMapping loadMapping = new LoadMapping();
-        Map<Integer, Double> localLoadMap = new HashMap<>(availableTasks.size());
-        for (int i = 0; i < availableTasks.size(); i++) {
-            localLoadMap.put(availableTasks.get(i), Math.random());
+        current = System.currentTimeMillis();
+        for (int i = 1; i <= 2_000_000_000 ; i++) {
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
         }
-        loadMapping.setLocal(localLoadMap);
-        return loadMapping;
+
+        LOG.info("Duration: {} ms", (System.currentTimeMillis() - current));
     }
 
-    private void runSimpleBenchmark(LoadAwareCustomStreamGrouping grouper,
-        List<Integer> availableTaskIds, LoadMapping loadMapping) {
+    private void runMultithreadedBenchmark(LoadAwareCustomStreamGrouping grouper,
+        List<Integer> availableTaskIds, LoadMapping loadMapping, int numThreads)
+        throws InterruptedException, ExecutionException {
         // Task Id not used, so just pick a static value
         final int inputTaskId = 100;
 
-        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+        final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+
+        // Call prepare with our available taskIds
         grouper.prepare(context, null, availableTaskIds);
 
-        // triggers building distribution ring
+        // triggers building ring
         grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
 
         long current = System.currentTimeMillis();
@@ -389,11 +448,40 @@ public class LoadAwareShuffleGroupingTest {
             }
         }
 
-        current = System.currentTimeMillis();
-        for (int i = 1; i <= 2_000_000_000 ; i++) {
-            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+        final int groupingExecutionsPerThread = 2_000_000_000;
+
+        List<Callable<Long>> threadTasks = Lists.newArrayList();
+        for (int x = 0; x < numThreads; x++) {
+            Callable<Long> threadTask = new Callable<Long>() {
+                @Override
+                public Long call() throws Exception {
+                    long current = System.currentTimeMillis();
+                    for (int i = 1; i <= groupingExecutionsPerThread; i++) {
+                        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+                    }
+                    return System.currentTimeMillis() - current;
+                }
+            };
+
+            // Add to our collection.
+            threadTasks.add(threadTask);
+        }
+
+        ExecutorService executor = Executors.newFixedThreadPool(threadTasks.size());
+        List<Future<Long>> taskResults = executor.invokeAll(threadTasks);
+
+        // Wait for all tasks to complete
+        Long maxDurationMillis = 0L;
+        for (Future taskResult: taskResults) {
+            while (!taskResult.isDone()) {
+                Thread.sleep(100);
+            }
+            Long durationMillis = (Long) taskResult.get();
+            if (maxDurationMillis < durationMillis) {
+                maxDurationMillis = durationMillis;
+            }
         }
 
-        System.out.println("Duration: " + (System.currentTimeMillis() - current) + " ms");
+        LOG.info("Max duration among threads is : {} ms", maxDurationMillis);
     }
 }
\ No newline at end of file


[4/8] storm git commit: STORM-2678 Improve performance of LoadAwareShuffleGrouping

Posted by bo...@apache.org.
STORM-2678 Improve performance of LoadAwareShuffleGrouping

* add a new way to provide LoadMapping: via push
* refresh load and push updated LoadMapping to all groupers when refreshLoadTimer is activated
* update tests to reflect the change


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1fd83a9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1fd83a9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1fd83a9

Branch: refs/heads/master
Commit: a1fd83a987995430269cd25ef360f703e7eb8f99
Parents: 8f63d5a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Aug 7 23:51:39 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Aug 7 23:51:39 2017 +0900

----------------------------------------------------------------------
 .../org/apache/storm/daemon/GrouperFactory.java | 10 ++++++
 .../org/apache/storm/daemon/worker/Worker.java  | 11 ++++++-
 .../jvm/org/apache/storm/executor/Executor.java | 19 +++++++++++
 .../apache/storm/executor/ExecutorShutdown.java |  6 ++++
 .../apache/storm/executor/IRunningExecutor.java |  2 ++
 .../grouping/LoadAwareCustomStreamGrouping.java |  1 +
 .../grouping/LoadAwareShuffleGrouping.java      | 28 +++-------------
 .../grouping/LoadAwareShuffleGroupingTest.java  | 34 ++++++++++++++++----
 8 files changed, 81 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
index 7c61136..03e029e 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -119,6 +119,11 @@ public class GrouperFactory {
         }
 
         @Override
+        public void refreshLoad(LoadMapping loadMapping) {
+
+        }
+
+        @Override
         public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
             return customStreamGrouping.chooseTasks(taskId, values);
         }
@@ -224,6 +229,11 @@ public class GrouperFactory {
     // A no-op grouper
     public static final LoadAwareCustomStreamGrouping DIRECT = new LoadAwareCustomStreamGrouping() {
         @Override
+        public void refreshLoad(LoadMapping loadMapping) {
+
+        }
+
+        @Override
         public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 59e86c7..0b2ab40 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -265,7 +265,7 @@ public class Worker implements Shutdownable, DaemonCommon {
 
                 // The jitter allows the clients to get the data at different times, and avoids thundering herd
                 if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
-                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
+                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this::doRefreshLoad);
                 }
 
                 workerState.refreshConnectionsTimer.scheduleRecurring(0,
@@ -285,6 +285,15 @@ public class Worker implements Shutdownable, DaemonCommon {
 
     }
 
+    public void doRefreshLoad() {
+        workerState.refreshLoad();
+
+        final List<IRunningExecutor> executors = executorsAtom.get();
+        for (IRunningExecutor executor : executors) {
+            executor.loadChanged(workerState.getLoadMapping());
+        }
+    }
+
     public void doHeartBeat() throws IOException {
         LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
         state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index c1c6350..5f2adf7 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -25,9 +25,11 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -54,6 +56,7 @@ import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.grouping.LoadMapping;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IMetricsConsumer;
 import org.apache.storm.stats.BoltExecutorStats;
@@ -79,6 +82,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
 
 public abstract class Executor implements Callable, EventHandler<Object> {
 
@@ -102,6 +106,7 @@ public abstract class Executor implements Callable, EventHandler<Object> {
     protected CommonStats stats;
     protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;
     protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamToComponentToGrouper;
+    protected final List<LoadAwareCustomStreamGrouping> groupers;
     protected final ReportErrorAndDie reportErrorDie;
     protected final Callable<Boolean> sampler;
     protected ExecutorTransfer executorTransfer;
@@ -162,6 +167,13 @@ public abstract class Executor implements Callable, EventHandler<Object> {
         this.intervalToTaskToMetricToRegistry = new HashMap<>();
         this.taskToComponent = workerData.getTaskToComponent();
         this.streamToComponentToGrouper = outboundComponents(workerTopologyContext, componentId, topoConf);
+        if (this.streamToComponentToGrouper != null) {
+            this.groupers = streamToComponentToGrouper.values().stream()
+                .filter(Objects::nonNull)
+                .flatMap(m -> m.values().stream()).collect(Collectors.toList());
+        } else {
+            this.groupers = Collections.emptyList();
+        }
         this.reportError = new ReportError(topoConf, stormClusterState, stormId, componentId, workerTopologyContext);
         this.reportErrorDie = new ReportErrorAndDie(reportError, suicideFn);
         this.sampler = ConfigUtils.mkStatsSampler(topoConf);
@@ -340,6 +352,12 @@ public abstract class Executor implements Callable, EventHandler<Object> {
         }
     }
 
+    public void reflectNewLoadMapping(LoadMapping loadMapping) {
+        for (LoadAwareCustomStreamGrouping g : groupers) {
+            g.refreshLoad(loadMapping);
+        }
+    }
+
     private void registerBackpressure() {
         receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback() {
             @Override
@@ -589,4 +607,5 @@ public abstract class Executor implements Callable, EventHandler<Object> {
         }
         return ret;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 144ee1b..7ea48b0 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -23,6 +23,7 @@ import org.apache.storm.daemon.Shutdownable;
 import org.apache.storm.daemon.Task;
 import org.apache.storm.generated.Credentials;
 import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.grouping.LoadMapping;
 import org.apache.storm.hooks.ITaskHook;
 import org.apache.storm.spout.ISpout;
 import org.apache.storm.task.IBolt;
@@ -69,6 +70,11 @@ public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
     }
 
     @Override
+    public void loadChanged(LoadMapping loadMapping) {
+        executor.reflectNewLoadMapping(loadMapping);
+    }
+
+    @Override
     public boolean getBackPressureFlag() {
         return executor.getBackpressure();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
index 441fdff..e7a4117 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
@@ -19,6 +19,7 @@ package org.apache.storm.executor;
 
 import org.apache.storm.generated.Credentials;
 import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.grouping.LoadMapping;
 
 import java.util.List;
 
@@ -27,5 +28,6 @@ public interface IRunningExecutor {
     ExecutorStats renderStats();
     List<Long> getExecutorId();
     void credentialsChanged(Credentials credentials);
+    void loadChanged(LoadMapping loadMapping);
     boolean getBackPressureFlag();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
index d825d2e..fba7254 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
@@ -20,5 +20,6 @@ package org.apache.storm.grouping;
 import java.util.List;
 
 public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
+   void refreshLoad(LoadMapping loadMapping);
    List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 84e4612..20437a1 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -33,9 +33,6 @@ import org.apache.storm.task.WorkerTopologyContext;
 public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
     private static final int CAPACITY_TASK_MULTIPLICATION = 100;
 
-    @VisibleForTesting
-    static final int CHECK_UPDATE_INDEX = 100;
-
     private Random random;
     private List<Integer>[] rets;
     private int[] targets;
@@ -43,10 +40,6 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
     private AtomicInteger current;
     private int actualCapacity = 0;
 
-    private AtomicInteger skipCheckingUpdateCount;
-    private AtomicBoolean isUpdating;
-    private long lastUpdate = 0;
-
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
         random = new Random();
@@ -70,8 +63,6 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
 
         Collections.shuffle(choices, random);
         current = new AtomicInteger(0);
-        skipCheckingUpdateCount = new AtomicInteger(0);
-        isUpdating = new AtomicBoolean(false);
     }
 
     @Override
@@ -80,21 +71,12 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
     }
 
     @Override
-    public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
-        if (skipCheckingUpdateCount.incrementAndGet() == CHECK_UPDATE_INDEX) {
-            skipCheckingUpdateCount.set(0);
-            if ((lastUpdate + 1000) < System.currentTimeMillis()
-                && isUpdating.compareAndSet(false, true)) {
-                // before finishing updateRing(), concurrent call will still rely on old choices
-                updateRing(load);
-
-                // update time and open a chance to update ring again
-                lastUpdate = System.currentTimeMillis();
-                skipCheckingUpdateCount.set(0);
-                isUpdating.set(false);
-            }
-        }
+    public void refreshLoad(LoadMapping loadMapping) {
+        updateRing(loadMapping);
+    }
 
+    @Override
+    public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
         int rightNow;
         int size = choices.size();
         while (true) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 667f36d..ed08a05 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -18,10 +18,13 @@
 package org.apache.storm.grouping;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.storm.StormTimer;
 import org.apache.storm.daemon.GrouperFactory;
 import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.NullStruct;
 import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -38,6 +41,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -90,9 +96,7 @@ public class LoadAwareShuffleGroupingTest {
         grouper.prepare(context, null, availableTaskIds);
 
         // force triggers building ring
-        for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
-            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
-        }
+        grouper.refreshLoad(loadMapping);
 
         // calling chooseTasks should be finished before refreshing ring
         // adjusting groupingExecutionsPerThread might be needed with really slow machine
@@ -209,6 +213,9 @@ public class LoadAwareShuffleGroupingTest {
         loadInfoMap.put(2, 0.0);
         load.setLocal(loadInfoMap);
 
+        // force triggers building ring
+        shuffler.refreshLoad(load);
+
         List<Object> data = Lists.newArrayList(1, 2);
         int[] frequencies = new int[3];
         for (int i = 0 ; i < numMessages ; i++) {
@@ -246,6 +253,9 @@ public class LoadAwareShuffleGroupingTest {
         loadInfoMap.put(2, 0.0);
         load.setLocal(loadInfoMap);
 
+        // force triggers building ring
+        shuffler.refreshLoad(load);
+
         List<Object> data = Lists.newArrayList(1, 2);
         int[] frequencies = new int[3]; // task id starts from 1
         for (int i = 0 ; i < numMessages ; i++) {
@@ -354,9 +364,7 @@ public class LoadAwareShuffleGroupingTest {
         int inputTaskId = 100;
 
         // force triggers building ring
-        for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
-            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
-        }
+        grouper.refreshLoad(loadMapping);
 
         for (int i = 1; i <= totalEmits; i++) {
             List<Integer> taskIds = grouper
@@ -413,6 +421,11 @@ public class LoadAwareShuffleGroupingTest {
         WorkerTopologyContext context = mock(WorkerTopologyContext.class);
         grouper.prepare(context, null, availableTaskIds);
 
+        // periodically calls refreshLoad in 1 sec to simulate worker load update timer
+        ScheduledExecutorService refreshService = MoreExecutors.getExitingScheduledExecutorService(
+            new ScheduledThreadPoolExecutor(1));
+        refreshService.scheduleAtFixedRate(() -> grouper.refreshLoad(loadMapping), 1, 1, TimeUnit.SECONDS);
+
         long current = System.currentTimeMillis();
         int idx = 0;
         while (true) {
@@ -433,6 +446,8 @@ public class LoadAwareShuffleGroupingTest {
         }
 
         LOG.info("Duration: {} ms", (System.currentTimeMillis() - current));
+
+        refreshService.shutdownNow();
     }
 
     private void runMultithreadedBenchmark(LoadAwareCustomStreamGrouping grouper,
@@ -446,6 +461,11 @@ public class LoadAwareShuffleGroupingTest {
         // Call prepare with our available taskIds
         grouper.prepare(context, null, availableTaskIds);
 
+        // periodically calls refreshLoad in 1 sec to simulate worker load update timer
+        ScheduledExecutorService refreshService = MoreExecutors.getExitingScheduledExecutorService(
+            new ScheduledThreadPoolExecutor(1));
+        refreshService.scheduleAtFixedRate(() -> grouper.refreshLoad(loadMapping), 1, 1, TimeUnit.SECONDS);
+
         long current = System.currentTimeMillis();
         int idx = 0;
         while (true) {
@@ -495,5 +515,7 @@ public class LoadAwareShuffleGroupingTest {
         }
 
         LOG.info("Max duration among threads is : {} ms", maxDurationMillis);
+
+        refreshService.shutdownNow();
     }
 }
\ No newline at end of file


[5/8] storm git commit: STORM-2678 Improve performance of LoadAwareShuffleGrouping

Posted by bo...@apache.org.
STORM-2678 Improve performance of LoadAwareShuffleGrouping

* change everything to Array and pre-allocate all
* use static length for choices
* prepare backup array for choices pre-allocated and swap to avoid allocating arrays


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/08038b6d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/08038b6d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/08038b6d

Branch: refs/heads/master
Commit: 08038b6d65462788796348e40adbacfa6d2f9467
Parents: a1fd83a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Aug 9 07:18:21 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 9 13:23:50 2017 +0900

----------------------------------------------------------------------
 .../grouping/LoadAwareShuffleGrouping.java      | 103 ++++++++++++-------
 .../grouping/LoadAwareShuffleGroupingTest.java  |  27 ++---
 2 files changed, 77 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/08038b6d/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 20437a1..080c48d 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -27,18 +27,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.WorkerTopologyContext;
 
 public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
-    private static final int CAPACITY_TASK_MULTIPLICATION = 100;
+    private static final int CAPACITY = 1000;
 
     private Random random;
     private List<Integer>[] rets;
     private int[] targets;
-    private ArrayList<List<Integer>> choices;
+    private int[] loads;
+    private int[] unassigned;
+    private int[] choices;
+    private int[] prepareChoices;
     private AtomicInteger current;
-    private int actualCapacity = 0;
 
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
@@ -51,18 +54,22 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
             targets[i] = targetTasks.get(i);
         }
 
-        actualCapacity = targets.length * CAPACITY_TASK_MULTIPLICATION;
-
         // can't leave choices to be empty, so initiate it similar as ShuffleGrouping
-        choices = new ArrayList<>(actualCapacity);
-        int index = 0;
-        while (index < actualCapacity) {
-            choices.add(rets[index % targets.length]);
-            index++;
+        choices = new int[CAPACITY];
+
+        for (int i = 0 ; i < CAPACITY ; i++) {
+            choices[i] = i % rets.length;
         }
 
-        Collections.shuffle(choices, random);
+        shuffleArray(choices);
         current = new AtomicInteger(0);
+
+        // allocate another array to be switched
+        prepareChoices = new int[CAPACITY];
+
+        // allocating only once
+        loads = new int[targets.length];
+        unassigned = new int[targets.length];
     }
 
     @Override
@@ -78,14 +85,13 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
     @Override
     public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
         int rightNow;
-        int size = choices.size();
         while (true) {
             rightNow = current.incrementAndGet();
-            if (rightNow < size) {
-                return choices.get(rightNow);
-            } else if (rightNow == size) {
+            if (rightNow < CAPACITY) {
+                return rets[choices[rightNow]];
+            } else if (rightNow == CAPACITY) {
                 current.set(0);
-                return choices.get(0);
+                return rets[choices[0]];
             }
             //race condition with another thread, and we lost
             // try again
@@ -94,45 +100,70 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
 
     private void updateRing(LoadMapping load) {
         int localTotal = 0;
-        int[] loads = new int[targets.length];
         for (int i = 0 ; i < targets.length; i++) {
             int val = (int)(101 - (load.get(targets[i]) * 100));
             loads[i] = val;
             localTotal += val;
         }
 
-        // allocating enough memory doesn't hurt much, so assign aggressively
-        // we will cut out if actual size becomes bigger than actualCapacity
-        ArrayList<List<Integer>> newChoices = new ArrayList<>(actualCapacity + targets.length);
+        int currentIdx = 0;
+        int unassignedIdx = 0;
         for (int i = 0 ; i < loads.length ; i++) {
+            if (currentIdx == CAPACITY) {
+                break;
+            }
+
             int loadForTask = loads[i];
-            int amount = loadForTask * actualCapacity / localTotal;
+            int amount = Math.round(loadForTask * 1.0f * CAPACITY / localTotal);
             // assign at least one for task
             if (amount == 0) {
-                amount = 1;
+                unassigned[unassignedIdx++] = i;
             }
             for (int j = 0; j < amount; j++) {
-                newChoices.add(rets[i]);
+                if (currentIdx == CAPACITY) {
+                    break;
+                }
+
+                prepareChoices[currentIdx++] = i;
             }
         }
 
-        Collections.shuffle(newChoices, random);
-
-        // make sure length of newChoices is same as actualCapacity, like current choices
-        // this ensures safety when requests and update occurs concurrently
-        if (newChoices.size() > actualCapacity) {
-            newChoices.subList(actualCapacity, newChoices.size()).clear();
-        } else if (newChoices.size() < actualCapacity) {
-            int remaining = actualCapacity - newChoices.size();
-            while (remaining > 0) {
-                newChoices.add(newChoices.get(remaining % newChoices.size()));
-                remaining--;
+        if (currentIdx < CAPACITY) {
+            // if there're some rooms, give unassigned tasks a chance to be included
+            // this should be really small amount, so just add them sequentially
+            if (unassignedIdx > 0) {
+                for (int i = currentIdx ; i < CAPACITY ; i++) {
+                    prepareChoices[i] = unassigned[(i - currentIdx) % unassignedIdx];
+                }
+            } else {
+                // just pick random
+                for (int i = currentIdx ; i < CAPACITY ; i++) {
+                    prepareChoices[i] = random.nextInt(loads.length);
+                }
             }
         }
 
-        assert newChoices.size() == actualCapacity;
+        shuffleArray(prepareChoices);
+
+        // swapping two arrays
+        int[] tempForSwap = choices;
+        choices = prepareChoices;
+        prepareChoices = tempForSwap;
 
-        choices = newChoices;
         current.set(0);
     }
+
+    private void shuffleArray(int[] arr) {
+        int size = arr.length;
+        for (int i = size; i > 1; i--) {
+            swap(arr, i - 1, random.nextInt(i));
+        }
+    }
+
+    private void swap(int[] arr, int i, int j) {
+        int tmp = arr[i];
+        arr[i] = arr[j];
+        arr[j] = tmp;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/08038b6d/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index ed08a05..2d90d66 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -71,12 +71,14 @@ public class LoadAwareShuffleGroupingTest {
         // distribution should be even for all nodes when loads are even
         int desiredTaskCountPerTask = 5000;
         int totalEmits = numTasks * desiredTaskCountPerTask;
+        int minPrCount = (int) (totalEmits * ((1.0 / numTasks) - ACCEPTABLE_MARGIN));
+        int maxPrCount = (int) (totalEmits * ((1.0 / numTasks) + ACCEPTABLE_MARGIN));
 
         int[] taskCounts = runChooseTasksWithVerification(grouper, totalEmits, numTasks, loadMapping);
 
         for (int i = 0; i < numTasks; i++) {
-            assertEquals("Distribution should be even for all nodes", desiredTaskCountPerTask,
-                taskCounts[i]);
+            assertTrue("Distribution should be even for all nodes with small delta",
+                taskCounts[i] >= minPrCount && taskCounts[i] <= maxPrCount);
         }
     }
 
@@ -106,6 +108,7 @@ public class LoadAwareShuffleGroupingTest {
         // distribution should be exactly even
         final int groupingExecutionsPerThread = numTasks * 5000;
         final int numThreads = 10;
+        int totalEmits = groupingExecutionsPerThread * numThreads;
 
         List<Callable<int[]>> threadTasks = Lists.newArrayList();
         for (int x = 0; x < numThreads; x++) {
@@ -149,22 +152,12 @@ public class LoadAwareShuffleGroupingTest {
             }
         }
 
-        int[] loads = new int[numTasks];
-        int localTotal = 0;
-        List<Double> loadRate = new ArrayList<>();
-        for (int i = 0; i < numTasks; i++) {
-            int val = (int)(101 - (loadMapping.get(i) * 100));
-            loads[i] = val;
-            localTotal += val;
-        }
-
-        for (int i = 0; i < numTasks; i++) {
-            loadRate.add(loads[i] * 1.0 / localTotal);
-        }
+        int minPrCount = (int) (totalEmits * ((1.0 / numTasks) - ACCEPTABLE_MARGIN));
+        int maxPrCount = (int) (totalEmits * ((1.0 / numTasks) + ACCEPTABLE_MARGIN));
 
         for (int i = 0; i < numTasks; i++) {
-            int expected = numThreads * groupingExecutionsPerThread / numTasks;
-            assertEquals("Distribution should be even for all nodes", expected, taskIdTotals[i]);
+            assertTrue("Distribution should be even for all nodes with small delta",
+                taskIdTotals[i] >= minPrCount && taskIdTotals[i] <= maxPrCount);
         }
     }
 
@@ -441,7 +434,7 @@ public class LoadAwareShuffleGroupingTest {
         }
 
         current = System.currentTimeMillis();
-        for (int i = 1; i <= 2_000_000_000 ; i++) {
+        for (int i = 1; i <= 2_000_000_000; i++) {
             grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
         }
 


[6/8] storm git commit: STORM-2678 Improve performance of LoadAwareShuffleGrouping

Posted by bo...@apache.org.
STORM-2678 Improve performance of LoadAwareShuffleGrouping

* Let chooseTask() read from index 0, not 1


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ab38a7a0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ab38a7a0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ab38a7a0

Branch: refs/heads/master
Commit: ab38a7a0c87a857f9a9021d9805d8d63c794ef74
Parents: 08038b6
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Aug 23 23:42:23 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Aug 23 23:42:23 2017 +0900

----------------------------------------------------------------------
 .../jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ab38a7a0/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 080c48d..97c5ce1 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -62,7 +62,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         }
 
         shuffleArray(choices);
-        current = new AtomicInteger(0);
+        current = new AtomicInteger(-1);
 
         // allocate another array to be switched
         prepareChoices = new int[CAPACITY];
@@ -150,7 +150,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         choices = prepareChoices;
         prepareChoices = tempForSwap;
 
-        current.set(0);
+        current.set(-1);
     }
 
     private void shuffleArray(int[] arr) {


[7/8] storm git commit: STORM-2678 Improve performance of LoadAwareShuffleGrouping

Posted by bo...@apache.org.
STORM-2678 Improve performance of LoadAwareShuffleGrouping

* remove LoadAware specific chooseTasks method


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4dade36c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4dade36c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4dade36c

Branch: refs/heads/master
Commit: 4dade36c6200a1fea06aa207f3f6d8470b725d0f
Parents: ab38a7a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Aug 24 09:15:27 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Aug 24 09:15:27 2017 +0900

----------------------------------------------------------------------
 .../org/apache/storm/daemon/GrouperFactory.java    | 10 ----------
 .../src/jvm/org/apache/storm/daemon/Task.java      |  2 +-
 .../grouping/LoadAwareCustomStreamGrouping.java    |  3 ---
 .../storm/grouping/LoadAwareShuffleGrouping.java   | 15 +++++----------
 .../grouping/LoadAwareShuffleGroupingTest.java     | 17 ++++++++---------
 .../test/clj/org/apache/storm/grouping_test.clj    |  2 +-
 6 files changed, 15 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
index 03e029e..179e882 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -124,11 +124,6 @@ public class GrouperFactory {
         }
 
         @Override
-        public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
-            return customStreamGrouping.chooseTasks(taskId, values);
-        }
-
-        @Override
         public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
             customStreamGrouping.prepare(context, stream, targetTasks);
         }
@@ -234,11 +229,6 @@ public class GrouperFactory {
         }
 
         @Override
-        public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
-            return null;
-        }
-
-        @Override
         public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
 
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index b79a259..0bc56fb 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -137,7 +137,7 @@ public class Task {
                 if (grouper == GrouperFactory.DIRECT) {
                     throw new IllegalArgumentException("Cannot do regular emit to direct stream");
                 }
-                List<Integer> compTasks = grouper.chooseTasks(taskId, values, loadMapping);
+                List<Integer> compTasks = grouper.chooseTasks(taskId, values);
                 outTasks.addAll(compTasks);
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
index fba7254..6e5a3fd 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
@@ -17,9 +17,6 @@
  */
 package org.apache.storm.grouping;
 
-import java.util.List;
-
 public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
    void refreshLoad(LoadMapping loadMapping);
-   List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 97c5ce1..0c0560a 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -74,16 +74,6 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
 
     @Override
     public List<Integer> chooseTasks(int taskId, List<Object> values) {
-        throw new RuntimeException("NOT IMPLEMENTED");
-    }
-
-    @Override
-    public void refreshLoad(LoadMapping loadMapping) {
-        updateRing(loadMapping);
-    }
-
-    @Override
-    public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
         int rightNow;
         while (true) {
             rightNow = current.incrementAndGet();
@@ -98,6 +88,11 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         }
     }
 
+    @Override
+    public void refreshLoad(LoadMapping loadMapping) {
+        updateRing(loadMapping);
+    }
+
     private void updateRing(LoadMapping load) {
         int localTotal = 0;
         for (int i = 0 ; i < targets.length; i++) {

http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 2d90d66..53ac404 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -117,8 +117,7 @@ public class LoadAwareShuffleGroupingTest {
                 public int[] call() throws Exception {
                     int[] taskCounts = new int[availableTaskIds.size()];
                     for (int i = 1; i <= groupingExecutionsPerThread; i++) {
-                        List<Integer> taskIds = grouper.chooseTasks(inputTaskId,
-                            Lists.newArrayList(), loadMapping);
+                        List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList());
 
                         // Validate a single task id return
                         assertNotNull("Not null taskId list returned", taskIds);
@@ -212,7 +211,7 @@ public class LoadAwareShuffleGroupingTest {
         List<Object> data = Lists.newArrayList(1, 2);
         int[] frequencies = new int[3];
         for (int i = 0 ; i < numMessages ; i++) {
-            List<Integer> tasks = shuffler.chooseTasks(1, data, load);
+            List<Integer> tasks = shuffler.chooseTasks(1, data);
             for (int task : tasks) {
                 frequencies[task]++;
             }
@@ -252,7 +251,7 @@ public class LoadAwareShuffleGroupingTest {
         List<Object> data = Lists.newArrayList(1, 2);
         int[] frequencies = new int[3]; // task id starts from 1
         for (int i = 0 ; i < numMessages ; i++) {
-            List<Integer> tasks = shuffler.chooseTasks(1, data, load);
+            List<Integer> tasks = shuffler.chooseTasks(1, data);
             for (int task : tasks) {
                 frequencies[task]++;
             }
@@ -361,7 +360,7 @@ public class LoadAwareShuffleGroupingTest {
 
         for (int i = 1; i <= totalEmits; i++) {
             List<Integer> taskIds = grouper
-                .chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+                .chooseTasks(inputTaskId, Lists.newArrayList());
 
             // Validate a single task id return
             assertNotNull("Not null taskId list returned", taskIds);
@@ -422,7 +421,7 @@ public class LoadAwareShuffleGroupingTest {
         long current = System.currentTimeMillis();
         int idx = 0;
         while (true) {
-            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList());
 
             idx++;
             if (idx % 100000 == 0) {
@@ -435,7 +434,7 @@ public class LoadAwareShuffleGroupingTest {
 
         current = System.currentTimeMillis();
         for (int i = 1; i <= 2_000_000_000; i++) {
-            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList());
         }
 
         LOG.info("Duration: {} ms", (System.currentTimeMillis() - current));
@@ -462,7 +461,7 @@ public class LoadAwareShuffleGroupingTest {
         long current = System.currentTimeMillis();
         int idx = 0;
         while (true) {
-            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+            grouper.chooseTasks(inputTaskId, Lists.newArrayList());
 
             idx++;
             if (idx % 100000 == 0) {
@@ -482,7 +481,7 @@ public class LoadAwareShuffleGroupingTest {
                 public Long call() throws Exception {
                     long current = System.currentTimeMillis();
                     for (int i = 1; i <= groupingExecutionsPerThread; i++) {
-                        grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
+                        grouper.chooseTasks(inputTaskId, Lists.newArrayList());
                     }
                     return System.currentTimeMillis() - current;
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4dade36c/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index 68ebb4e..c216008 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -32,7 +32,7 @@
        min-prcnt (int (* num-messages 0.49))
        max-prcnt (int (* num-messages 0.51))
        data [1 2]
-       freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data nil)))
+       freq (frequencies (for [x (range 0 num-messages)] (.chooseTasks shuffler (int 1) data)))
        load1 (.get freq [(int 1)])
        load2 (.get freq [(int 2)])]
     (log-message "FREQ:" freq)