You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/07 18:09:22 UTC
[4/8] storm git commit: STORM-2678 Improve performance of
LoadAwareShuffleGrouping
STORM-2678 Improve performance of LoadAwareShuffleGrouping
* add a new way to provide LoadMapping: via push
* refresh load and push updated LoadMapping to all groupers when refreshLoadTimer is activated
* update tests to reflect the change
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1fd83a9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1fd83a9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1fd83a9
Branch: refs/heads/master
Commit: a1fd83a987995430269cd25ef360f703e7eb8f99
Parents: 8f63d5a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Aug 7 23:51:39 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Aug 7 23:51:39 2017 +0900
----------------------------------------------------------------------
.../org/apache/storm/daemon/GrouperFactory.java | 10 ++++++
.../org/apache/storm/daemon/worker/Worker.java | 11 ++++++-
.../jvm/org/apache/storm/executor/Executor.java | 19 +++++++++++
.../apache/storm/executor/ExecutorShutdown.java | 6 ++++
.../apache/storm/executor/IRunningExecutor.java | 2 ++
.../grouping/LoadAwareCustomStreamGrouping.java | 1 +
.../grouping/LoadAwareShuffleGrouping.java | 28 +++-------------
.../grouping/LoadAwareShuffleGroupingTest.java | 34 ++++++++++++++++----
8 files changed, 81 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
index 7c61136..03e029e 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -119,6 +119,11 @@ public class GrouperFactory {
}
@Override
+ public void refreshLoad(LoadMapping loadMapping) {
+
+ }
+
+ @Override
public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
return customStreamGrouping.chooseTasks(taskId, values);
}
@@ -224,6 +229,11 @@ public class GrouperFactory {
// A no-op grouper
public static final LoadAwareCustomStreamGrouping DIRECT = new LoadAwareCustomStreamGrouping() {
@Override
+ public void refreshLoad(LoadMapping loadMapping) {
+
+ }
+
+ @Override
public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
return null;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 59e86c7..0b2ab40 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -265,7 +265,7 @@ public class Worker implements Shutdownable, DaemonCommon {
// The jitter allows the clients to get the data at different times, and avoids thundering herd
if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
- workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
+ workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this::doRefreshLoad);
}
workerState.refreshConnectionsTimer.scheduleRecurring(0,
@@ -285,6 +285,15 @@ public class Worker implements Shutdownable, DaemonCommon {
}
+ public void doRefreshLoad() {
+ workerState.refreshLoad();
+
+ final List<IRunningExecutor> executors = executorsAtom.get();
+ for (IRunningExecutor executor : executors) {
+ executor.loadChanged(workerState.getLoadMapping());
+ }
+ }
+
public void doHeartBeat() throws IOException {
LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index c1c6350..5f2adf7 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -25,9 +25,11 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -54,6 +56,7 @@ import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IMetricsConsumer;
import org.apache.storm.stats.BoltExecutorStats;
@@ -79,6 +82,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
public abstract class Executor implements Callable, EventHandler<Object> {
@@ -102,6 +106,7 @@ public abstract class Executor implements Callable, EventHandler<Object> {
protected CommonStats stats;
protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;
protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamToComponentToGrouper;
+ protected final List<LoadAwareCustomStreamGrouping> groupers;
protected final ReportErrorAndDie reportErrorDie;
protected final Callable<Boolean> sampler;
protected ExecutorTransfer executorTransfer;
@@ -162,6 +167,13 @@ public abstract class Executor implements Callable, EventHandler<Object> {
this.intervalToTaskToMetricToRegistry = new HashMap<>();
this.taskToComponent = workerData.getTaskToComponent();
this.streamToComponentToGrouper = outboundComponents(workerTopologyContext, componentId, topoConf);
+ if (this.streamToComponentToGrouper != null) {
+ this.groupers = streamToComponentToGrouper.values().stream()
+ .filter(Objects::nonNull)
+ .flatMap(m -> m.values().stream()).collect(Collectors.toList());
+ } else {
+ this.groupers = Collections.emptyList();
+ }
this.reportError = new ReportError(topoConf, stormClusterState, stormId, componentId, workerTopologyContext);
this.reportErrorDie = new ReportErrorAndDie(reportError, suicideFn);
this.sampler = ConfigUtils.mkStatsSampler(topoConf);
@@ -340,6 +352,12 @@ public abstract class Executor implements Callable, EventHandler<Object> {
}
}
+ public void reflectNewLoadMapping(LoadMapping loadMapping) {
+ for (LoadAwareCustomStreamGrouping g : groupers) {
+ g.refreshLoad(loadMapping);
+ }
+ }
+
private void registerBackpressure() {
receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback() {
@Override
@@ -589,4 +607,5 @@ public abstract class Executor implements Callable, EventHandler<Object> {
}
return ret;
}
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index 144ee1b..7ea48b0 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -23,6 +23,7 @@ import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.daemon.Task;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.hooks.ITaskHook;
import org.apache.storm.spout.ISpout;
import org.apache.storm.task.IBolt;
@@ -69,6 +70,11 @@ public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
}
@Override
+ public void loadChanged(LoadMapping loadMapping) {
+ executor.reflectNewLoadMapping(loadMapping);
+ }
+
+ @Override
public boolean getBackPressureFlag() {
return executor.getBackpressure();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
index 441fdff..e7a4117 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
@@ -19,6 +19,7 @@ package org.apache.storm.executor;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.grouping.LoadMapping;
import java.util.List;
@@ -27,5 +28,6 @@ public interface IRunningExecutor {
ExecutorStats renderStats();
List<Long> getExecutorId();
void credentialsChanged(Credentials credentials);
+ void loadChanged(LoadMapping loadMapping);
boolean getBackPressureFlag();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
index d825d2e..fba7254 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareCustomStreamGrouping.java
@@ -20,5 +20,6 @@ package org.apache.storm.grouping;
import java.util.List;
public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
+ void refreshLoad(LoadMapping loadMapping);
List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 84e4612..20437a1 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -33,9 +33,6 @@ import org.apache.storm.task.WorkerTopologyContext;
public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping, Serializable {
private static final int CAPACITY_TASK_MULTIPLICATION = 100;
- @VisibleForTesting
- static final int CHECK_UPDATE_INDEX = 100;
-
private Random random;
private List<Integer>[] rets;
private int[] targets;
@@ -43,10 +40,6 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
private AtomicInteger current;
private int actualCapacity = 0;
- private AtomicInteger skipCheckingUpdateCount;
- private AtomicBoolean isUpdating;
- private long lastUpdate = 0;
-
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
random = new Random();
@@ -70,8 +63,6 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
Collections.shuffle(choices, random);
current = new AtomicInteger(0);
- skipCheckingUpdateCount = new AtomicInteger(0);
- isUpdating = new AtomicBoolean(false);
}
@Override
@@ -80,21 +71,12 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
}
@Override
- public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
- if (skipCheckingUpdateCount.incrementAndGet() == CHECK_UPDATE_INDEX) {
- skipCheckingUpdateCount.set(0);
- if ((lastUpdate + 1000) < System.currentTimeMillis()
- && isUpdating.compareAndSet(false, true)) {
- // before finishing updateRing(), concurrent call will still rely on old choices
- updateRing(load);
-
- // update time and open a chance to update ring again
- lastUpdate = System.currentTimeMillis();
- skipCheckingUpdateCount.set(0);
- isUpdating.set(false);
- }
- }
+ public void refreshLoad(LoadMapping loadMapping) {
+ updateRing(loadMapping);
+ }
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load) {
int rightNow;
int size = choices.size();
while (true) {
http://git-wip-us.apache.org/repos/asf/storm/blob/a1fd83a9/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 667f36d..ed08a05 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -18,10 +18,13 @@
package org.apache.storm.grouping;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.storm.StormTimer;
import org.apache.storm.daemon.GrouperFactory;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.NullStruct;
import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Utils;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
@@ -38,6 +41,9 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -90,9 +96,7 @@ public class LoadAwareShuffleGroupingTest {
grouper.prepare(context, null, availableTaskIds);
// force triggers building ring
- for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
- }
+ grouper.refreshLoad(loadMapping);
// calling chooseTasks should be finished before refreshing ring
// adjusting groupingExecutionsPerThread might be needed with really slow machine
@@ -209,6 +213,9 @@ public class LoadAwareShuffleGroupingTest {
loadInfoMap.put(2, 0.0);
load.setLocal(loadInfoMap);
+ // force triggers building ring
+ shuffler.refreshLoad(load);
+
List<Object> data = Lists.newArrayList(1, 2);
int[] frequencies = new int[3];
for (int i = 0 ; i < numMessages ; i++) {
@@ -246,6 +253,9 @@ public class LoadAwareShuffleGroupingTest {
loadInfoMap.put(2, 0.0);
load.setLocal(loadInfoMap);
+ // force triggers building ring
+ shuffler.refreshLoad(load);
+
List<Object> data = Lists.newArrayList(1, 2);
int[] frequencies = new int[3]; // task id starts from 1
for (int i = 0 ; i < numMessages ; i++) {
@@ -354,9 +364,7 @@ public class LoadAwareShuffleGroupingTest {
int inputTaskId = 100;
// force triggers building ring
- for (int i = 0 ; i < LoadAwareShuffleGrouping.CHECK_UPDATE_INDEX ; i++) {
- grouper.chooseTasks(inputTaskId, Lists.newArrayList(), loadMapping);
- }
+ grouper.refreshLoad(loadMapping);
for (int i = 1; i <= totalEmits; i++) {
List<Integer> taskIds = grouper
@@ -413,6 +421,11 @@ public class LoadAwareShuffleGroupingTest {
WorkerTopologyContext context = mock(WorkerTopologyContext.class);
grouper.prepare(context, null, availableTaskIds);
+ // periodically calls refreshLoad in 1 sec to simulate worker load update timer
+ ScheduledExecutorService refreshService = MoreExecutors.getExitingScheduledExecutorService(
+ new ScheduledThreadPoolExecutor(1));
+ refreshService.scheduleAtFixedRate(() -> grouper.refreshLoad(loadMapping), 1, 1, TimeUnit.SECONDS);
+
long current = System.currentTimeMillis();
int idx = 0;
while (true) {
@@ -433,6 +446,8 @@ public class LoadAwareShuffleGroupingTest {
}
LOG.info("Duration: {} ms", (System.currentTimeMillis() - current));
+
+ refreshService.shutdownNow();
}
private void runMultithreadedBenchmark(LoadAwareCustomStreamGrouping grouper,
@@ -446,6 +461,11 @@ public class LoadAwareShuffleGroupingTest {
// Call prepare with our available taskIds
grouper.prepare(context, null, availableTaskIds);
+ // periodically calls refreshLoad in 1 sec to simulate worker load update timer
+ ScheduledExecutorService refreshService = MoreExecutors.getExitingScheduledExecutorService(
+ new ScheduledThreadPoolExecutor(1));
+ refreshService.scheduleAtFixedRate(() -> grouper.refreshLoad(loadMapping), 1, 1, TimeUnit.SECONDS);
+
long current = System.currentTimeMillis();
int idx = 0;
while (true) {
@@ -495,5 +515,7 @@ public class LoadAwareShuffleGroupingTest {
}
LOG.info("Max duration among threads is : {} ms", maxDurationMillis);
+
+ refreshService.shutdownNow();
}
}
\ No newline at end of file