You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 18:09:22 UTC

[4/8] storm git commit: STORM-2678 Improve performance of LoadAwareShuffleGrouping

STORM-2678 Improve performance of LoadAwareShuffleGrouping

* add a new way to provide LoadMapping: via push
* refresh load and push updated LoadMapping to all groupers when refreshLoadTimer is activated
* update tests to reflect the change


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1fd83a9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1fd83a9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1fd83a9

Branch: refs/heads/master
Commit: a1fd83a987995430269cd25ef360f703e7eb8f99
Parents: 8f63d5a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Aug 7 23:51:39 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Aug 7 23:51:39 2017 +0900

----------------------------------------------------------------------
 .../org/apache/storm/daemon/GrouperFactory.java | 10 ++++++
 .../org/apache/storm/daemon/worker/Worker.java  | 11 ++++++-
 .../jvm/org/apache/storm/executor/Executor.java | 19 +++++++++++
 .../apache/storm/executor/ExecutorShutdown.java |  6 ++++
 .../apache/storm/executor/IRunningExecutor.java |  2 ++
 .../grouping/LoadAwareCustomStreamGrouping.java |  1 +
 .../grouping/LoadAwareShuffleGrouping.java      | 28 +++-------------
 .../grouping/LoadAwareShuffleGroupingTest.java  | 34 ++++++++++++++++----
 8 files changed, 81 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
index 7c61136..03e029e 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -119,6 +119,11 @@ public class GrouperFactory {
         }
 
         @Override
+        public void refreshLoad(LoadMapping loadMapping) {
+
+        }
+
+        @Override
         public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
             return customStreamGrouping.chooseTasks(taskId, values);
         }
@@ -224,6 +229,11 @@ public class GrouperFactory {
     // A no-op grouper
     public static final LoadAwareCustomStreamGrouping DIRECT = new LoadAwareCustomStreamGrouping() {
         @Override
+        public void refreshLoad(LoadMapping loadMapping) {
+
+        }
+
+        @Override
         public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 59e86c7..0b2ab40 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -265,7 +265,7 @@ public class Worker implements Shutdownable, DaemonCommon {
 
                 // The jitter allows the clients to get the data at different times, and avoids thundering herd
                 if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
-                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
+                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this::doRefreshLoad);
                 }
 
                 workerState.refreshConnectionsTimer.scheduleRecurring(0,
@@ -285,6 +285,15 @@ public class Worker implements Shutdownable, DaemonCommon {
 
     }
 
+    public void doRefreshLoad() {
+        workerState.refreshLoad();
+
+        final List<IRunningExecutor> executors = executorsAtom.get();
+        for (IRunningExecutor executor : executors) {
+            executor.loadChanged(workerState.getLoadMapping());
+        }
+    }
+
     public void doHeartBeat() throws IOException {
         LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
         state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index c1c6350..5f2adf7 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -25,9 +25,11 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -54,6 +56,7 @@ import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.grouping.LoadMapping;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IMetricsConsumer;
 import org.apache.storm.stats.BoltExecutorStats;
@@ -79,6 +82,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
 
 public abstract class Executor implements Callable, EventHandler<Object> {
 
@@ -102,6 +106,7 @@ public abstract class Executor implements Callable, EventHandler<Object> {
     protected CommonStats stats;
     protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;
     protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamToComponentToGrouper;
+    protected final List<LoadAwareCustomStreamGrouping> groupers;
     protected final ReportErrorAndDie reportErrorDie;
     protected final Callable<Boolean> sampler;
     protected ExecutorTransfer executorTransfer;
@@ -162,6 +167,13 @@ public abstract class Executor implements Callable, EventHandler<Object> {
         this.intervalToTaskToMetricToRegistry = new HashMap<>();
         this.taskToComponent = workerData.getTaskToComponent();
         this.streamToComponentToGrouper = outboundComponents(workerTopologyContext, componentId, topoConf);
+        if (this.streamToComponentToGrouper != null) {
+            this.groupers = streamToComponentToGrouper.values().stream()
+                .filter(Objects::nonNull)
+                .flatMap(m -> m.values().stream()).collect(Collectors.toList());
+        } else {
+            this.groupers = Collections.emptyList();
+        }
         this.reportError = new ReportError(topoConf, stormClusterState, stormId, componentId, workerTopologyContext);
         this.reportErrorDie = new ReportErrorAndDie(reportError, suicideFn);
         this.sampler = ConfigUtils.mkStatsSampler(topoConf);
@@ -340,6 +352,12 @@ public abstract class Executor implements Callable, EventHandler<Object> {
         }
     }
 
+    public void reflectNewLoadMapping(LoadMapping loadMapping) {
+        for (LoadAwareCustomStreamGrouping g : groupers) {
+            g.refreshLoad(loadMapping);
+        }
+    }
+
     private void registerBackpressure() {
         receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback() {
             @Override
@@ -589,4 +607,5 @@ public abstract class Executor implements Callable, EventHandler<Object> {
         }
         return ret;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 144ee1b..7ea48b0 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -23,6 +23,7 @@ import org.apache.storm.daemon.Shutdownable;
 import org.apache.storm.daemon.Task;
 import org.apache.storm.generated.Credentials;
 import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.grouping.LoadMapping;
 import org.apache.storm.hooks.ITaskHook;
 import org.apache.storm.spout.ISpout;
 import org.apache.storm.task.IBolt;
@@ -69,6 +70,11 @@ public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
     }
 
     @Override
+    public void loadChanged(LoadMapping loadMapping) {
+        executor.reflectNewLoadMapping(loadMapping);
+    }
+
+    @Override
     public boolean getBackPressureFlag() {
         return executor.getBackpressure();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
index 441fdff..e7a4117 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
@@ -19,6 +19,7 @@ package org.apache.storm.executor;
 
 import org.apache.storm.generated.Credentials;
 import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.grouping.LoadMapping;
 
 import java.util.List;
 
@@ -27,5 +28,6 @@ public interface IRunningExecutor {
     ExecutorStats renderStats();
     List<Long> getExecutorId();
     void credentialsChanged(Credentials credentials);
+    void loadChanged(LoadMapping loadMapping);
     boolean getBackPressureFlag();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
index d825d2e..fba7254 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
@@ -20,5 +20,6 @@ package org.apache.storm.grouping;
 import java.util.List;
 
 public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
+   void refreshLoad(LoadMapping loadMapping);
    List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 84e4612..20437a1 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -33,9 +33,6 @@ import org.apache.storm.task.WorkerTopologyContext;
 public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
     private static final int CAPACITY_TASK_MULTIPLICATION = 100;
 
-    @VisibleForTesting
-    static final int CHECK_UPDATE_INDEX = 100;
-
     private Random random;
     private List<Integer>[] rets;
     private int[] targets;
@@ -43,10 +40,6 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
     private AtomicInteger current;
     private int actualCapacity = 0;
 
-    private AtomicInteger skipCheckingUpdateCount;
-    private AtomicBoolean isUpdating;
-    private long lastUpdate = 0;
-
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
         random = new Random();
@@ -70,8 +63,6 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
 
         Collections.shuffle(choices, random);
         current = new AtomicInteger(0);
-        skipCheckingUpdateCount = new AtomicInteger(0);
-        isUpdating = new AtomicBoolean(false);
     }
 
     @Override
@@ -80,21 +71,12 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
     }
 
     @Override
-    public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
-        if (skipCheckingUpdateCount.incrementAndGet() == CHECK_UPDATE_INDEX) {
-            skipCheckingUpdateCount.set(0);
-            if ((lastUpdate + 1000) < System.currentTimeMillis()
-                && isUpdating.compareAndSet(false, true)) {
-                // before finishing updateRing(), concurrent call will still rely on old choices
-                updateRing(load);
-
-                // update time and open a chance to update ring again
-                lastUpdate = System.currentTimeMillis();
-                skipCheckingUpdateCount.set(0);
-                isUpdating.set(false);
-            }
-        }
+    public void refreshLoad(LoadMapping loadMapping) {
+        updateRing(loadMapping);
+    }
 
+    @Override
+    public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
         int rightNow;
         int size = choices.size();
         while (true) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 667f36d..ed08a05 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -18,10 +18,13 @@
 package org.apache.storm.grouping;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.storm.StormTimer;
 import org.apache.storm.daemon.GrouperFactory;
 import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.NullStruct;
 import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -38,6 +41,9 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -90,9 +96,7 @@ public class LoadAwareShuffleGroupingTest {
         grouper.prepare(context, null, availableTaskIds);
 
         // force triggers building ring
-        for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
-            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
-        }
+        grouper.refreshLoad(loadMapping);
 
         // calling chooseTasks should be finished before refreshing ring
         // adjusting groupingExecutionsPerThread might be needed with really slow machine
@@ -209,6 +213,9 @@ public class LoadAwareShuffleGroupingTest {
         loadInfoMap.put(2, 0.0);
         load.setLocal(loadInfoMap);
 
+        // force triggers building ring
+        shuffler.refreshLoad(load);
+
         List<Object> data = Lists.newArrayList(1, 2);
         int[] frequencies = new int[3];
         for (int i = 0 ; i < numMessages ; i++) {
@@ -246,6 +253,9 @@ public class LoadAwareShuffleGroupingTest {
         loadInfoMap.put(2, 0.0);
         load.setLocal(loadInfoMap);
 
+        // force triggers building ring
+        shuffler.refreshLoad(load);
+
         List<Object> data = Lists.newArrayList(1, 2);
         int[] frequencies = new int[3]; // task id starts from 1
         for (int i = 0 ; i < numMessages ; i++) {
@@ -354,9 +364,7 @@ public class LoadAwareShuffleGroupingTest {
         int inputTaskId = 100;
 
         // force triggers building ring
-        for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
-            grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
-        }
+        grouper.refreshLoad(loadMapping);
 
         for (int i = 1; i <= totalEmits; i++) {
             List<Integer> taskIds = grouper
@@ -413,6 +421,11 @@ public class LoadAwareShuffleGroupingTest {
         WorkerTopologyContext context = mock(WorkerTopologyContext.class);
         grouper.prepare(context, null, availableTaskIds);
 
+        // periodically calls refreshLoad in 1 sec to simulate worker load update timer
+        ScheduledExecutorService refreshService = MoreExecutors.getExitingScheduledExecutorService(
+            new ScheduledThreadPoolExecutor(1));
+        refreshService.scheduleAtFixedRate(() -> grouper.refreshLoad(loadMapping), 1, 1, TimeUnit.SECONDS);
+
         long current = System.currentTimeMillis();
         int idx = 0;
         while (true) {
@@ -433,6 +446,8 @@ public class LoadAwareShuffleGroupingTest {
         }
 
         LOG.info("Duration: {} ms", (System.currentTimeMillis() - current));
+
+        refreshService.shutdownNow();
     }
 
     private void runMultithreadedBenchmark(LoadAwareCustomStreamGrouping grouper,
@@ -446,6 +461,11 @@ public class LoadAwareShuffleGroupingTest {
         // Call prepare with our available taskIds
         grouper.prepare(context, null, availableTaskIds);
 
+        // periodically calls refreshLoad in 1 sec to simulate worker load update timer
+        ScheduledExecutorService refreshService = MoreExecutors.getExitingScheduledExecutorService(
+            new ScheduledThreadPoolExecutor(1));
+        refreshService.scheduleAtFixedRate(() -> grouper.refreshLoad(loadMapping), 1, 1, TimeUnit.SECONDS);
+
         long current = System.currentTimeMillis();
         int idx = 0;
         while (true) {
@@ -495,5 +515,7 @@ public class LoadAwareShuffleGroupingTest {
         }
 
         LOG.info("Max duration among threads is : {} ms", maxDurationMillis);
+
+        refreshService.shutdownNow();
     }
 }
\ No newline at end of file