You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/03/19 13:58:34 UTC

[storm] branch 2.1.x-branch updated: STORM-3602 fix switching on low water mark for loadaware shuffle (#3227)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/2.1.x-branch by this push:
     new de646bf  STORM-3602 fix switching on low water mark for loadaware shuffle (#3227)
de646bf is described below

commit de646bf5bc110edb60f3456b49c01264a439b382
Author: agresch <ag...@gmail.com>
AuthorDate: Thu Mar 19 08:57:27 2020 -0500

    STORM-3602 fix switching on low water mark for loadaware shuffle (#3227)
---
 .../storm/grouping/LoadAwareShuffleGrouping.java   | 62 +++++++++--------
 .../grouping/LoadAwareShuffleGroupingTest.java     | 78 +++++++++++++++++++++-
 2 files changed, 111 insertions(+), 29 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
index 9056403..f969ade 100644
--- a/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
+++ b/storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
@@ -47,13 +47,13 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
     private Random random;
     private volatile int[] prepareChoices;
     private AtomicInteger current;
-    private Scope currentScope;
+    private LocalityScope currentScope;
     private NodeInfo sourceNodeInfo;
     private List<Integer> targetTasks;
     private AtomicReference<Map<Integer, NodeInfo>> taskToNodePort;
     private Map<String, Object> conf;
     private DNSToSwitchMapping dnsToSwitchMapping;
-    private Map<Scope, List<Integer>> localityGroup;
+    private Map<LocalityScope, List<Integer>> localityGroup;
     private double higherBound;
     private double lowerBound;
 
@@ -67,7 +67,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         conf = context.getConf();
         dnsToSwitchMapping = ReflectionUtils.newInstance((String) conf.get(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN));
         localityGroup = new HashMap<>();
-        currentScope = Scope.WORKER_LOCAL;
+        currentScope = LocalityScope.WORKER_LOCAL;
         higherBound = ObjectReader.getDouble(conf.get(Config.TOPOLOGY_LOCALITYAWARE_HIGHER_BOUND));
         lowerBound = ObjectReader.getDouble(conf.get(Config.TOPOLOGY_LOCALITYAWARE_LOWER_BOUND));
 
@@ -116,7 +116,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         localityGroup.values().stream().forEach(v -> v.clear());
 
         for (int target : targetTasks) {
-            Scope scope = calculateScope(cachedTaskToNodePort, hostToRack, target);
+            LocalityScope scope = calculateScope(cachedTaskToNodePort, hostToRack, target);
             if (!localityGroup.containsKey(scope)) {
                 localityGroup.put(scope, new ArrayList<>());
             }
@@ -124,23 +124,23 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         }
     }
 
-    private List<Integer> getTargetsInScope(Scope scope) {
+    private List<Integer> getTargetsInScope(LocalityScope scope) {
         List<Integer> rets = new ArrayList<>();
         List<Integer> targetInScope = localityGroup.get(scope);
         if (null != targetInScope) {
             rets.addAll(targetInScope);
         }
-        Scope downgradeScope = Scope.downgrade(scope);
+        LocalityScope downgradeScope = LocalityScope.downgrade(scope);
         if (downgradeScope != scope) {
             rets.addAll(getTargetsInScope(downgradeScope));
         }
         return rets;
     }
 
-    private Scope transition(LoadMapping load) {
+    private LocalityScope transition(LoadMapping load) {
         List<Integer> targetInScope = getTargetsInScope(currentScope);
         if (targetInScope.isEmpty()) {
-            Scope upScope = Scope.upgrade(currentScope);
+            LocalityScope upScope = LocalityScope.upgrade(currentScope);
             if (upScope == currentScope) {
                 throw new RuntimeException("The current scope " + currentScope + " has no target tasks.");
             }
@@ -153,16 +153,19 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         }
 
         double avg = targetInScope.stream().mapToDouble((key) -> load.get(key)).average().getAsDouble();
-        Scope nextScope;
-        if (avg < lowerBound) {
-            nextScope = Scope.downgrade(currentScope);
-            if (getTargetsInScope(nextScope).isEmpty()) {
-                nextScope = currentScope;
-            }
-        } else if (avg > higherBound) {
-            nextScope = Scope.upgrade(currentScope);
+
+        LocalityScope nextScope = currentScope;
+        if (avg > higherBound) {
+            nextScope = LocalityScope.upgrade(currentScope);
         } else {
-            nextScope = currentScope;
+            LocalityScope lowerScope = LocalityScope.downgrade(currentScope);
+            List<Integer> lowerTargets = getTargetsInScope(lowerScope);
+            if (!lowerTargets.isEmpty()) {
+                double lowerAvg = lowerTargets.stream().mapToDouble((key) -> load.get(key)).average().getAsDouble();
+                if (lowerAvg < lowerBound) {
+                    nextScope = lowerScope;
+                }
+            }
         }
 
         return nextScope;
@@ -170,7 +173,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
 
     private synchronized void updateRing(LoadMapping load) {
         refreshLocalityGroup();
-        Scope prevScope = currentScope;
+        LocalityScope prevScope = currentScope;
         currentScope = transition(load);
         if (currentScope != prevScope) {
             //reset all the weights
@@ -246,11 +249,11 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         arr[j] = tmp;
     }
 
-    private Scope calculateScope(Map<Integer, NodeInfo> taskToNodePort, Map<String, String> hostToRack, int target) {
+    private LocalityScope calculateScope(Map<Integer, NodeInfo> taskToNodePort, Map<String, String> hostToRack, int target) {
         NodeInfo targetNodeInfo = taskToNodePort.get(target);
 
         if (targetNodeInfo == null) {
-            return Scope.EVERYTHING;
+            return LocalityScope.EVERYTHING;
         }
 
         String sourceRack = hostToRack.get(sourceNodeInfo.get_node());
@@ -259,13 +262,13 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         if (sourceRack != null && targetRack != null && sourceRack.equals(targetRack)) {
             if (sourceNodeInfo.get_node().equals(targetNodeInfo.get_node())) {
                 if (sourceNodeInfo.get_port().equals(targetNodeInfo.get_port())) {
-                    return Scope.WORKER_LOCAL;
+                    return LocalityScope.WORKER_LOCAL;
                 }
-                return Scope.HOST_LOCAL;
+                return LocalityScope.HOST_LOCAL;
             }
-            return Scope.RACK_LOCAL;
+            return LocalityScope.RACK_LOCAL;
         } else {
-            return Scope.EVERYTHING;
+            return LocalityScope.EVERYTHING;
         }
     }
 
@@ -289,10 +292,15 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
         return capacity;
     }
 
-    enum Scope {
+    @VisibleForTesting
+    public LocalityScope getCurrentScope() {
+        return currentScope;
+    }
+
+    enum LocalityScope {
         WORKER_LOCAL, HOST_LOCAL, RACK_LOCAL, EVERYTHING;
 
-        public static Scope downgrade(Scope current) {
+        public static LocalityScope downgrade(LocalityScope current) {
             switch (current) {
                 case EVERYTHING:
                     return RACK_LOCAL;
@@ -305,7 +313,7 @@ public class LoadAwareShuffleGrouping implements LoadAwareCustomStreamGrouping,
             }
         }
 
-        public static Scope upgrade(Scope current) {
+        public static LocalityScope upgrade(LocalityScope current) {
             switch (current) {
                 case WORKER_LOCAL:
                     return HOST_LOCAL;
diff --git a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
index 2ad8848..678d803 100644
--- a/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/grouping/LoadAwareShuffleGroupingTest.java
@@ -51,14 +51,17 @@ public class LoadAwareShuffleGroupingTest {
     public static final double ACCEPTABLE_MARGIN = 0.015;
     private static final Logger LOG = LoggerFactory.getLogger(LoadAwareShuffleGroupingTest.class);
 
-    private WorkerTopologyContext mockContext(List<Integer> availableTaskIds) {
+    private Map<String, Object> createConf() {
         Map<String, Object> conf = new HashMap<>();
         conf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
         conf.put(Config.TOPOLOGY_LOCALITYAWARE_HIGHER_BOUND, 0.8);
         conf.put(Config.TOPOLOGY_LOCALITYAWARE_LOWER_BOUND, 0.2);
+        return conf;
+    }
 
+    private WorkerTopologyContext mockContext(List<Integer> availableTaskIds) {
         WorkerTopologyContext context = mock(WorkerTopologyContext.class);
-        when(context.getConf()).thenReturn(conf);
+        when(context.getConf()).thenReturn(createConf());
         Map<Integer, NodeInfo> taskNodeToPort = new HashMap<>();
         NodeInfo nodeInfo = new NodeInfo("node-id", Sets.newHashSet(6700L));
         availableTaskIds.forEach(e -> taskNodeToPort.put(e, nodeInfo));
@@ -496,4 +499,75 @@ public class LoadAwareShuffleGroupingTest {
 
         refreshService.shutdownNow();
     }
+
+    @Test
+    public void testLoadSwitching() throws Exception {
+        LoadAwareShuffleGrouping grouping = new LoadAwareShuffleGrouping();
+        WorkerTopologyContext context = createLoadSwitchingContext();
+        grouping.prepare(context, new GlobalStreamId("a", "default"), Arrays.asList(1, 2, 3));
+        // startup should default to worker local
+        assertEquals(LoadAwareShuffleGrouping.LocalityScope.WORKER_LOCAL, grouping.getCurrentScope());
+
+        // with high load, switch to host local
+        LoadMapping lm = createLoadMapping(1.0, 1.0, 1.0);
+        grouping.refreshLoad(lm);
+        assertEquals(LoadAwareShuffleGrouping.LocalityScope.HOST_LOCAL, grouping.getCurrentScope());
+
+        // load remains high, switch to rack local
+        grouping.refreshLoad(lm);
+        assertEquals(LoadAwareShuffleGrouping.LocalityScope.RACK_LOCAL, grouping.getCurrentScope());
+
+        // load remains high. switch to everything
+        grouping.refreshLoad(lm);
+        assertEquals(LoadAwareShuffleGrouping.LocalityScope.EVERYTHING, grouping.getCurrentScope());
+
+        // lower load below low water threshold, but worker local load remains too high
+        // should switch to rack local
+        lm = createLoadMapping(0.2, 0.1, 0.1);
+        grouping.refreshLoad(lm);
+        assertEquals(LoadAwareShuffleGrouping.LocalityScope.RACK_LOCAL, grouping.getCurrentScope());
+
+        // lower load continues, switch to host local
+        grouping.refreshLoad(lm);
+        assertEquals(LoadAwareShuffleGrouping.LocalityScope.HOST_LOCAL, grouping.getCurrentScope());
+
+        // lower load continues, should NOT be able to switch to worker local yet
+        grouping.refreshLoad(lm);
+        assertEquals(LoadAwareShuffleGrouping.LocalityScope.HOST_LOCAL, grouping.getCurrentScope());
+
+        // reduce load on local worker task, should switch to worker local
+        lm = createLoadMapping(0.1, 0.1, 0.1);
+        grouping.refreshLoad(lm);
+        assertEquals(LoadAwareShuffleGrouping.LocalityScope.WORKER_LOCAL, grouping.getCurrentScope());
+    }
+
+    private LoadMapping createLoadMapping(double load1, double load2, double load3) {
+        Map<Integer, Double> localLoad = new HashMap<>();
+        localLoad.put(1, load1);
+        localLoad.put(2, load2);
+        localLoad.put(3, load3);
+        LoadMapping lm = new LoadMapping();
+        lm.setLocal(localLoad);
+        return lm;
+    }
+
+    // creates a WorkerTopologyContext with 3 tasks, one worker local, one host local,
+    // and one rack local
+    private WorkerTopologyContext createLoadSwitchingContext() {
+        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+        when(context.getConf()).thenReturn(createConf());
+        Map<Integer, NodeInfo> taskNodeToPort = new HashMap<>();
+
+        // worker local task
+        taskNodeToPort.put(1, new NodeInfo("node-id", Sets.newHashSet(6701L)));
+        // node local task
+        taskNodeToPort.put(2, new NodeInfo("node-id", Sets.newHashSet(6702L)));
+        // rack local task
+        taskNodeToPort.put(3, new NodeInfo("node-id2", Sets.newHashSet(6703L)));
+
+        when(context.getTaskToNodePort()).thenReturn(new AtomicReference<>(taskNodeToPort));
+        when(context.getThisWorkerHost()).thenReturn("node-id");
+        when(context.getThisWorkerPort()).thenReturn(6701);
+        return context;
+    }
 }
\ No newline at end of file