You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/03/19 13:57:46 UTC
[storm] branch master updated: STORM-3602 fix switching on low
water mark for loadaware shuffle (#3227)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new fb78f00 STORM-3602 fix switching on low water mark for loadaware shuffle (#3227)
fb78f00 is described below
commit fb78f00849474392269137b190390a210826259f
Author: agresch <ag...@gmail.com>
AuthorDate: Thu Mar 19 08:57:27 2020 -0500
STORM-3602 fix switching on low water mark for loadaware shuffle (#3227)
---
.../storm/grouping/LoadAwareShuffleGrouping.java | 62 +++++++++--------
.../grouping/LoadAwareShuffleGroupingTest.java | 78 +++++++++++++++++++++-
2 files changed, 111 insertions(+), 29 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 9056403..f969ade 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -47,13 +47,13 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
private Random random;
private volatile int[] prepareChoices;
private AtomicInteger current;
- private Scope currentScope;
+ private LocalityScope currentScope;
private NodeInfo sourceNodeInfo;
private List<Integer> targetTasks;
private AtomicReference<Map<Integer, NodeInfo>> taskToNodePort;
private Map<String, Object> conf;
private DNSToSwitchMapping dnsToSwitchMapping;
- private Map<Scope, List<Integer>> localityGroup;
+ private Map<LocalityScope, List<Integer>> localityGroup;
private double higherBound;
private double lowerBound;
@@ -67,7 +67,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
conf = context.getConf();
dnsToSwitchMapping = ReflectionUtils.newInstance((String) conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN));
localityGroup = new HashMap<>();
- currentScope = Scope.WORKER_LOCAL;
+ currentScope = LocalityScope.WORKER_LOCAL;
higherBound = ObjectReader.getDouble(conf.get(Config.TOPOLOGY_LOCALITYAWARE_HIGHER_BOUND));
lowerBound = ObjectReader.getDouble(conf.get(Config.TOPOLOGY_LOCALITYAWARE_LOWER_BOUND));
@@ -116,7 +116,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
localityGroup.values().stream().forEach(v -> v.clear());
for (int target : targetTasks) {
- Scope scope = calculateScope(cachedTaskToNodePort, hostToRack, target);
+ LocalityScope scope = calculateScope(cachedTaskToNodePort, hostToRack, target);
if (!localityGroup.containsKey(scope)) {
localityGroup.put(scope, new ArrayList<>());
}
@@ -124,23 +124,23 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
}
}
- private List<Integer> getTargetsInScope(Scope scope) {
+ private List<Integer> getTargetsInScope(LocalityScope scope) {
List<Integer> rets = new ArrayList<>();
List<Integer> targetInScope = localityGroup.get(scope);
if (null != targetInScope) {
rets.addAll(targetInScope);
}
- Scope downgradeScope = Scope.downgrade(scope);
+ LocalityScope downgradeScope = LocalityScope.downgrade(scope);
if (downgradeScope != scope) {
rets.addAll(getTargetsInScope(downgradeScope));
}
return rets;
}
- private Scope transition(LoadMapping load) {
+ private LocalityScope transition(LoadMapping load) {
List<Integer> targetInScope = getTargetsInScope(currentScope);
if (targetInScope.isEmpty()) {
- Scope upScope = Scope.upgrade(currentScope);
+ LocalityScope upScope = LocalityScope.upgrade(currentScope);
if (upScope == currentScope) {
throw new RuntimeException("The current scope " + currentScope + " has no target tasks.");
}
@@ -153,16 +153,19 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
}
double avg = targetInScope.stream().mapToDouble((key) -> load.get(key)).average().getAsDouble();
- Scope nextScope;
- if (avg < lowerBound) {
- nextScope = Scope.downgrade(currentScope);
- if (getTargetsInScope(nextScope).isEmpty()) {
- nextScope = currentScope;
- }
- } else if (avg > higherBound) {
- nextScope = Scope.upgrade(currentScope);
+
+ LocalityScope nextScope = currentScope;
+ if (avg > higherBound) {
+ nextScope = LocalityScope.upgrade(currentScope);
} else {
- nextScope = currentScope;
+ LocalityScope lowerScope = LocalityScope.downgrade(currentScope);
+ List<Integer> lowerTargets = getTargetsInScope(lowerScope);
+ if (!lowerTargets.isEmpty()) {
+ double lowerAvg = lowerTargets.stream().mapToDouble((key) -> load.get(key)).average().getAsDouble();
+ if (lowerAvg < lowerBound) {
+ nextScope = lowerScope;
+ }
+ }
}
return nextScope;
@@ -170,7 +173,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
private synchronized void updateRing(LoadMapping load) {
refreshLocalityGroup();
- Scope prevScope = currentScope;
+ LocalityScope prevScope = currentScope;
currentScope = transition(load);
if (currentScope != prevScope) {
//reset all the weights
@@ -246,11 +249,11 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
arr[j] = tmp;
}
- private Scope calculateScope(Map<Integer, NodeInfo> taskToNodePort, Map<String, String> hostToRack, int target) {
+ private LocalityScope calculateScope(Map<Integer, NodeInfo> taskToNodePort, Map<String, String> hostToRack, int target) {
NodeInfo targetNodeInfo = taskToNodePort.get(target);
if (targetNodeInfo == null) {
- return Scope.EVERYTHING;
+ return LocalityScope.EVERYTHING;
}
String sourceRack = hostToRack.get(sourceNodeInfo.get_node());
@@ -259,13 +262,13 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
if (sourceRack != null && targetRack != null && sourceRack.equals(targetRack)) {
if (sourceNodeInfo.get_node().equals(targetNodeInfo.get_node())) {
if (sourceNodeInfo.get_port().equals(targetNodeInfo.get_port())) {
- return Scope.WORKER_LOCAL;
+ return LocalityScope.WORKER_LOCAL;
}
- return Scope.HOST_LOCAL;
+ return LocalityScope.HOST_LOCAL;
}
- return Scope.RACK_LOCAL;
+ return LocalityScope.RACK_LOCAL;
} else {
- return Scope.EVERYTHING;
+ return LocalityScope.EVERYTHING;
}
}
@@ -289,10 +292,15 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
return capacity;
}
- enum Scope {
+ @VisibleForTesting
+ public LocalityScope getCurrentScope() {
+ return currentScope;
+ }
+
+ enum LocalityScope {
WORKER_LOCAL, HOST_LOCAL, RACK_LOCAL, EVERYTHING;
- public static Scope downgrade(Scope current) {
+ public static LocalityScope downgrade(LocalityScope current) {
switch (current) {
case EVERYTHING:
return RACK_LOCAL;
@@ -305,7 +313,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
}
}
- public static Scope upgrade(Scope current) {
+ public static LocalityScope upgrade(LocalityScope current) {
switch (current) {
case WORKER_LOCAL:
return HOST_LOCAL;
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 2ad8848..678d803 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -51,14 +51,17 @@ public class LoadAwareShuffleGroupingTest {
public static final double ACCEPTABLE_MARGIN = 0.015;
private static final Logger LOG = LoggerFactory.getLogger(LoadAwareShuffleGroupingTest.class);
- private WorkerTopologyContext mockContext(List<Integer> availableTaskIds) {
+ private Map<String, Object> createConf() {
Map<String, Object> conf = new HashMap<>();
conf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
conf.put(Config.TOPOLOGY_LOCALITYAWARE_HIGHER_BOUND, 0.8);
conf.put(Config.TOPOLOGY_LOCALITYAWARE_LOWER_BOUND, 0.2);
+ return conf;
+ }
+ private WorkerTopologyContext mockContext(List<Integer> availableTaskIds) {
WorkerTopologyContext context = mock(WorkerTopologyContext.class);
- when(context.getConf()).thenReturn(conf);
+ when(context.getConf()).thenReturn(createConf());
Map<Integer, NodeInfo> taskNodeToPort = new HashMap<>();
NodeInfo nodeInfo = new NodeInfo("node-id", Sets.newHashSet(6700L));
availableTaskIds.forEach(e -> taskNodeToPort.put(e, nodeInfo));
@@ -496,4 +499,75 @@ public class LoadAwareShuffleGroupingTest {
refreshService.shutdownNow();
}
+
+ @Test
+ public void testLoadSwitching() throws Exception {
+ LoadAwareShuffleGrouping grouping = new LoadAwareShuffleGrouping();
+ WorkerTopologyContext context = createLoadSwitchingContext();
+ grouping.prepare(context, new GlobalStreamId("a", "default"), Arrays.asList(1, 2, 3));
+ // startup should default to worker local
+ assertEquals(LoadAwareShuffleGrouping.LocalityScope.WORKER_LOCAL, grouping.getCurrentScope());
+
+ // with high load, switch to host local
+ LoadMapping lm = createLoadMapping(1.0, 1.0, 1.0);
+ grouping.refreshLoad(lm);
+ assertEquals(LoadAwareShuffleGrouping.LocalityScope.HOST_LOCAL, grouping.getCurrentScope());
+
+ // load remains high, switch to rack local
+ grouping.refreshLoad(lm);
+ assertEquals(LoadAwareShuffleGrouping.LocalityScope.RACK_LOCAL, grouping.getCurrentScope());
+
+ // load remains high. switch to everything
+ grouping.refreshLoad(lm);
+ assertEquals(LoadAwareShuffleGrouping.LocalityScope.EVERYTHING, grouping.getCurrentScope());
+
+ // lower load below low water threshold, but worker local load remains too high
+ // should switch to rack local
+ lm = createLoadMapping(0.2, 0.1, 0.1);
+ grouping.refreshLoad(lm);
+ assertEquals(LoadAwareShuffleGrouping.LocalityScope.RACK_LOCAL, grouping.getCurrentScope());
+
+ // lower load continues, switch to host local
+ grouping.refreshLoad(lm);
+ assertEquals(LoadAwareShuffleGrouping.LocalityScope.HOST_LOCAL, grouping.getCurrentScope());
+
+ // lower load continues, should NOT be able to switch to worker local yet
+ grouping.refreshLoad(lm);
+ assertEquals(LoadAwareShuffleGrouping.LocalityScope.HOST_LOCAL, grouping.getCurrentScope());
+
+ // reduce load on local worker task, should switch to worker local
+ lm = createLoadMapping(0.1, 0.1, 0.1);
+ grouping.refreshLoad(lm);
+ assertEquals(LoadAwareShuffleGrouping.LocalityScope.WORKER_LOCAL, grouping.getCurrentScope());
+ }
+
+ private LoadMapping createLoadMapping(double load1, double load2, double load3) {
+ Map<Integer, Double> localLoad = new HashMap<>();
+ localLoad.put(1, load1);
+ localLoad.put(2, load2);
+ localLoad.put(3, load3);
+ LoadMapping lm = new LoadMapping();
+ lm.setLocal(localLoad);
+ return lm;
+ }
+
+ // creates a WorkerTopologyContext with 3 tasks, one worker local, one host local,
+ // and one rack local
+ private WorkerTopologyContext createLoadSwitchingContext() {
+ WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+ when(context.getConf()).thenReturn(createConf());
+ Map<Integer, NodeInfo> taskNodeToPort = new HashMap<>();
+
+ // worker local task
+ taskNodeToPort.put(1, new NodeInfo("node-id", Sets.newHashSet(6701L)));
+ // node local task
+ taskNodeToPort.put(2, new NodeInfo("node-id", Sets.newHashSet(6702L)));
+ // rack local task
+ taskNodeToPort.put(3, new NodeInfo("node-id2", Sets.newHashSet(6703L)));
+
+ when(context.getTaskToNodePort()).thenReturn(new AtomicReference<>(taskNodeToPort));
+ when(context.getThisWorkerHost()).thenReturn("node-id");
+ when(context.getThisWorkerPort()).thenReturn(6701);
+ return context;
+ }
}
\ No newline at end of file