You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/06/02 15:26:05 UTC

[storm] branch master updated: [STORM-2687] Add network proximity needs based executor sorting method (#3265)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 013654d  [STORM-2687] Add network proximity needs based executor sorting method (#3265)
013654d is described below

commit 013654d9cb415c4119d2bdf753351f5af5394fad
Author: Rui Li <ru...@verizonmedia.com>
AuthorDate: Tue Jun 2 10:25:50 2020 -0500

    [STORM-2687] Add network proximity needs based executor sorting method (#3265)
    
    * [STORM-2687] Add network proximity needs based executor sorting method
    
    Co-authored-by: Ethan Li <et...@gmail.com>
---
 storm-client/src/jvm/org/apache/storm/Config.java  |   7 +
 .../java/org/apache/storm/scheduler/Component.java |  29 ++--
 .../apache/storm/scheduler/TopologyDetails.java    |   6 +-
 .../scheduling/BaseResourceAwareStrategy.java      | 169 ++++++++++++++++++++-
 .../TestDefaultResourceAwareStrategy.java          |  59 +++++++
 .../TestGenericResourceAwareStrategy.java          |  63 ++++++++
 6 files changed, 320 insertions(+), 13 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 3316f42..c27b468 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -313,6 +313,13 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_SCHEDULER_STRATEGY = "topology.scheduler.strategy";
 
     /**
+     * When DefaultResourceAwareStrategy or GenericResourceAwareStrategy is used,
+     * scheduler will sort unassigned executors based on a particular order.
+     * If this config is set to true, unassigned executors will be sorted by topological order with network proximity needs.
+     */
+    public static final String TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS = "topology.ras.order.executors.by.proximity.needs";
+
+    /**
      * Declare scheduling constraints for a topology used by the constraint solver strategy. The format can be either
      * old style (validated by ListOfListOfStringValidator.class or the newer style, which is a list of specific type of
      * Maps (validated by RasConstraintsTypeValidator.class). The value must be in one or the other format.
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Component.java b/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
index c4a28c3..7f5953a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
@@ -18,10 +18,14 @@
 
 package org.apache.storm.scheduler;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.storm.generated.ComponentType;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
 
 public class Component {
     private final String id;
@@ -29,6 +33,7 @@ public class Component {
     private final ComponentType type;
     private final Set<String> parents = new HashSet<>();
     private final Set<String> children = new HashSet<>();
+    private final Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
 
     /**
      * Create a new component.
@@ -37,10 +42,11 @@ public class Component {
      * @param compId the id of the component
      * @param execs  the executors for this component.
      */
-    public Component(ComponentType type, String compId, List<ExecutorDetails> execs) {
+    public Component(ComponentType type, String compId, List<ExecutorDetails> execs, Map<GlobalStreamId, Grouping> inputs) {
         this.type = type;
         this.id = compId;
         this.execs = execs;
+        this.inputs.putAll(inputs);
     }
 
     /**
@@ -73,16 +79,19 @@ public class Component {
         return children;
     }
 
+    public Map<GlobalStreamId, Grouping> getInputs() {
+        return inputs;
+    }
+
     @Override
     public String toString() {
-        return "{id: "
-               + getId()
-               + " Parents: "
-               + getParents()
-               + " Children: "
-               + getChildren()
-               + " Execs: "
-               + getExecs()
-               + "}";
+        return "Component{"
+            + "id='" + id + '\''
+            + ", execs=" + execs
+            + ", type=" + type
+            + ", parents=" + parents
+            + ", children=" + children
+            + ", inputs=" + inputs
+            + '}';
     }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index be20834..551a94b 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -207,8 +207,9 @@ public class TopologyDetails {
         if (spouts != null) {
             for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
                 String compId = entry.getKey();
+                SpoutSpec spout = entry.getValue();
                 if (!Utils.isSystemId(compId)) {
-                    Component comp = new Component(ComponentType.SPOUT, compId, componentToExecs(compId));
+                    Component comp = new Component(ComponentType.SPOUT, compId, componentToExecs(compId), spout.get_common().get_inputs());
                     ret.put(compId, comp);
                 }
             }
@@ -216,8 +217,9 @@ public class TopologyDetails {
         if (bolts != null) {
             for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
                 String compId = entry.getKey();
+                Bolt bolt = entry.getValue();
                 if (!Utils.isSystemId(compId)) {
-                    Component comp = new Component(ComponentType.BOLT, compId, componentToExecs(compId));
+                    Component comp = new Component(ComponentType.BOLT, compId, componentToExecs(compId), bolt.get_common().get_inputs());
                     ret.put(compId, comp);
                 }
             }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index d6a1bc3..87345a0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -35,7 +35,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.storm.Config;
 import org.apache.storm.generated.ComponentType;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
 import org.apache.storm.networktopography.DNSToSwitchMapping;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.Component;
@@ -51,6 +54,7 @@ import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.collect.Sets;
+import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -511,6 +515,18 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
         return sortedComponents;
     }
 
+    protected List<ExecutorDetails> orderExecutors(
+        TopologyDetails td, Collection<ExecutorDetails> unassignedExecutors) {
+        Boolean orderByProximity = ObjectReader.getBoolean(
+            td.getConf().get(Config.TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS), false);
+        if (!orderByProximity) {
+            return orderExecutorsDefault(td, unassignedExecutors);
+        } else {
+            LOG.info("{} is set to true", Config.TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS);
+            return orderExecutorsByProximityNeeds(td, unassignedExecutors);
+        }
+    }
+
     /**
      * Order executors based on how many in and out connections it will potentially need to make, in descending order. First order
      * components by the number of in and out connections it will have.  Then iterate through the sorted list of components. For each
@@ -522,7 +538,7 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
      *                            this list
      * @return a list of executors in sorted order
      */
-    protected List<ExecutorDetails> orderExecutors(
+    private List<ExecutorDetails> orderExecutorsDefault(
         TopologyDetails td, Collection<ExecutorDetails> unassignedExecutors) {
         Map<String, Component> componentMap = td.getComponents();
         List<ExecutorDetails> execsScheduled = new LinkedList<>();
@@ -570,6 +586,157 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
     }
 
     /**
+     * Order executors by network proximity needs.
+     * @param td The topology the executors belong to
+     * @param unassignedExecutors a collection of unassigned executors that need to be unassigned. Should only try to
+     *     assign executors from this list
+     * @return a list of executors in sorted order
+     */
+    private List<ExecutorDetails> orderExecutorsByProximityNeeds(
+        TopologyDetails td, Collection<ExecutorDetails> unassignedExecutors) {
+        Map<String, Component> componentMap = td.getComponents();
+        List<ExecutorDetails> execsScheduled = new LinkedList<>();
+
+        Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
+        for (Component component : componentMap.values()) {
+            compToExecsToSchedule.put(component.getId(), new LinkedList<>());
+            for (ExecutorDetails exec : component.getExecs()) {
+                if (unassignedExecutors.contains(exec)) {
+                    compToExecsToSchedule.get(component.getId()).add(exec);
+                }
+            }
+        }
+
+        List<Component> sortedComponents = topologicalSortComponents(componentMap);
+
+        for (Component currComp: sortedComponents) {
+            int numExecs = compToExecsToSchedule.get(currComp.getId()).size();
+            for (int i = 0; i < numExecs; i++) {
+                execsScheduled.addAll(takeExecutors(currComp, componentMap, compToExecsToSchedule));
+            }
+        }
+
+        return execsScheduled;
+    }
+
+    /**
+     * Sort components topologically.
+     * @param componentMap The map of component Id to Component Object.
+     * @return The sorted components
+     */
+    private List<Component> topologicalSortComponents(final Map<String, Component> componentMap) {
+        List<Component> sortedComponents = new ArrayList<>();
+        boolean[] visited = new boolean[componentMap.size()];
+        int[] inDegree = new int[componentMap.size()];
+        List<String> componentIds = new ArrayList<>(componentMap.keySet());
+        Map<String, Integer> compIdToIndex = new HashMap<>();
+        for (int i = 0; i < componentIds.size(); i++) {
+            compIdToIndex.put(componentIds.get(i), i);
+        }
+        //initialize the in-degree array
+        for (int i = 0; i < inDegree.length; i++) {
+            String compId = componentIds.get(i);
+            Component comp = componentMap.get(compId);
+            for (String childId : comp.getChildren()) {
+                inDegree[compIdToIndex.get(childId)] += 1;
+            }
+        }
+        //sorting components topologically
+        for (int t = 0; t < inDegree.length; t++) {
+            for (int i = 0; i < inDegree.length; i++) {
+                if (inDegree[i] == 0 && !visited[i]) {
+                    String compId = componentIds.get(i);
+                    Component comp = componentMap.get(compId);
+                    sortedComponents.add(comp);
+                    visited[i] = true;
+                    for (String childId : comp.getChildren()) {
+                        inDegree[compIdToIndex.get(childId)]--;
+                    }
+                    break;
+                }
+            }
+        }
+        return sortedComponents;
+    }
+
+    /**
+     * Take unscheduled executors from current and all its downstream components in a particular order.
+     * First, take one executor from the current component;
+     * then for every child (direct downstream component) of this component,
+     *     if it's shuffle grouping from the current component to this child,
+     *         the number of executors to take from this child is the max of
+     *         1 and (the number of unscheduled executors this child has / the number of unscheduled executors the current component has);
+     *     otherwise, the number of executors to take is 1;
+     *     for every executor to take from this child, call takeExecutors(...).
+     * @param currComp The current component.
+     * @param componentMap The map from component Id to component object.
+     * @param compToExecsToSchedule The map from component Id to unscheduled executors.
+     * @return The executors to schedule in order.
+     */
+    private List<ExecutorDetails> takeExecutors(Component currComp,
+                                                final Map<String, Component> componentMap,
+                                                final Map<String, Queue<ExecutorDetails>> compToExecsToSchedule) {
+        List<ExecutorDetails> execsScheduled = new ArrayList<>();
+        Queue<ExecutorDetails> currQueue = compToExecsToSchedule.get(currComp.getId());
+        int currUnscheduledNumExecs = currQueue.size();
+        //Just for defensive programming as this won't actually happen.
+        if (currUnscheduledNumExecs == 0) {
+            return execsScheduled;
+        }
+        execsScheduled.add(currQueue.poll());
+        Set<String> sortedChildren = getSortedChildren(currComp, componentMap);
+        for (String childId: sortedChildren) {
+            Component childComponent = componentMap.get(childId);
+            Queue<ExecutorDetails> childQueue = compToExecsToSchedule.get(childId);
+            int childUnscheduledNumExecs = childQueue.size();
+            if (childUnscheduledNumExecs == 0) {
+                continue;
+            }
+            int numExecsToTake = 1;
+            if (hasShuffleGroupingFromParentToChild(currComp, childComponent)) {
+                // if it's shuffle grouping, truncate
+                numExecsToTake = Math.max(1, childUnscheduledNumExecs / currUnscheduledNumExecs);
+            } // otherwise, one-by-one
+            for (int i = 0; i < numExecsToTake; i++) {
+                execsScheduled.addAll(takeExecutors(childComponent, componentMap, compToExecsToSchedule));
+            }
+        }
+        return execsScheduled;
+    }
+
+    private Set<String> getSortedChildren(Component component, final Map<String, Component> componentMap) {
+        Set<String> children = component.getChildren();
+        Set<String> sortedChildren =
+            new TreeSet<>((o1, o2) -> {
+                Component child1 = componentMap.get(o1);
+                Component child2 = componentMap.get(o2);
+                boolean child1IsShuffle = hasShuffleGroupingFromParentToChild(component, child1);
+                boolean child2IsShuffle = hasShuffleGroupingFromParentToChild(component, child2);
+                if (child1IsShuffle && child2IsShuffle) {
+                    return o1.compareTo(o2);
+                } else if (child1IsShuffle) {
+                    return 1;
+                } else {
+                    return -1;
+                }
+            });
+        sortedChildren.addAll(children);
+        return sortedChildren;
+    }
+
+    private boolean hasShuffleGroupingFromParentToChild(Component parent, Component child) {
+        for (Map.Entry<GlobalStreamId, Grouping> inputEntry: child.getInputs().entrySet()) {
+            GlobalStreamId globalStreamId = inputEntry.getKey();
+            Grouping grouping = inputEntry.getValue();
+            if (globalStreamId.get_componentId().equals(parent.getId())
+                && (inputEntry.getValue().is_set_local_or_shuffle() || grouping.is_set_shuffle())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
      * Get a list of all the spouts in the topology.
      *
      * @param td topology to get spouts from
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index a2f2437..3fd0d73 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -475,6 +475,65 @@ public class TestDefaultResourceAwareStrategy {
     }
 
     /**
+     * test if the scheduling logic for the DefaultResourceAwareStrategy (when made by network proximity needs.) is correct
+     */
+    @Test
+    public void testDefaultResourceAwareStrategyInFavorOfShuffle() {
+        int spoutParallelism = 1;
+        int boltParallelism = 2;
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("spout", new TestSpout(),
+            spoutParallelism);
+        builder.setBolt("bolt-1", new TestBolt(),
+            boltParallelism).shuffleGrouping("spout");
+        builder.setBolt("bolt-2", new TestBolt(),
+            boltParallelism).shuffleGrouping("bolt-1");
+        builder.setBolt("bolt-3", new TestBolt(),
+            boltParallelism).shuffleGrouping("bolt-2");
+
+        StormTopology stormToplogy = builder.createTopology();
+
+        INimbus iNimbus = new INimbusTest();
+        Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 150, 1500);
+        Config conf = createClusterConfig(50, 250, 250, null);
+        conf.put(Config.TOPOLOGY_PRIORITY, 0);
+        conf.put(Config.TOPOLOGY_NAME, "testTopology");
+        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+        conf.put(Config.TOPOLOGY_SUBMITTER_USER, "user");
+        conf.put(Config.TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS, true);
+
+        TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
+            genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+        Topologies topologies = new Topologies(topo);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(conf, new StormMetricsRegistry());
+        rs.schedule(topologies, cluster);
+        //:<[[[0, 0], [6, 6], [2, 2]], [[3, 3]], [[5, 5], [4, 4], [1, 1]]]>
+
+        HashSet<HashSet<ExecutorDetails>> expectedScheduling = new HashSet<>();
+        expectedScheduling.add(new HashSet<>(Arrays.asList(
+            new ExecutorDetails(0, 0), //spout
+            new ExecutorDetails(6, 6), //bolt-2
+            new ExecutorDetails(2, 2)))); //bolt-1
+        expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(3, 3)))); //bolt-3
+        expectedScheduling.add(new HashSet<>(Arrays.asList(
+            new ExecutorDetails(5, 5), //bolt-2
+            new ExecutorDetails(4, 4), //bolt-3
+            new ExecutorDetails(1, 1)))); //bolt-1
+        HashSet<HashSet<ExecutorDetails>> foundScheduling = new HashSet<>();
+        SchedulerAssignment assignment = cluster.getAssignmentById("testTopology-id");
+        for (Collection<ExecutorDetails> execs : assignment.getSlotToExecutors().values()) {
+            foundScheduling.add(new HashSet<>(execs));
+        }
+
+        Assert.assertEquals(expectedScheduling, foundScheduling);
+    }
+
+    /**
      * Test whether strategy will choose correct rack
      */
     @Test
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
index 51252ad..79796e6 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
@@ -301,6 +301,69 @@ public class TestGenericResourceAwareStrategy {
         assertTopologiesFullyScheduled(cluster, gpu2);
     }
     
+    /**
+     * test if the scheduling logic for the GenericResourceAwareStrategy (when in favor of shuffle) is correct.
+     */
+    @Test
+    public void testGenericResourceAwareStrategyInFavorOfShuffle() {
+        int spoutParallelism = 1;
+        int boltParallelism = 2;
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("spout", new TestSpout(),
+            spoutParallelism);
+        builder.setBolt("bolt-1", new TestBolt(),
+            boltParallelism).shuffleGrouping("spout");
+        builder.setBolt("bolt-2", new TestBolt(),
+            boltParallelism).shuffleGrouping("bolt-1").addResource("gpu.count", 1.0);
+        builder.setBolt("bolt-3", new TestBolt(),
+            boltParallelism).shuffleGrouping("bolt-2").addResource("gpu.count", 2.0);
+
+        StormTopology stormToplogy = builder.createTopology();
+
+        INimbus iNimbus = new INimbusTest();
+
+        Config conf = createGrasClusterConfig(50, 250, 250, null, Collections.emptyMap());
+        Map<String, Double> genericResourcesMap = new HashMap<>();
+        genericResourcesMap.put("gpu.count", 2.0);
+        Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 150, 1500, genericResourcesMap);
+
+
+        conf.put(Config.TOPOLOGY_PRIORITY, 0);
+        conf.put(Config.TOPOLOGY_NAME, "testTopology");
+        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+        conf.put(Config.TOPOLOGY_SUBMITTER_USER, "user");
+        conf.put(Config.TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS, true);
+
+        TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
+            genExecsAndComps(stormToplogy), currentTime, "user");
+
+        Topologies topologies = new Topologies(topo);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(conf, new StormMetricsRegistry());
+        rs.schedule(topologies, cluster);
+
+        HashSet<HashSet<ExecutorDetails>> expectedScheduling = new HashSet<>();
+        expectedScheduling.add(new HashSet<>(Arrays.asList(
+            new ExecutorDetails(0, 0),
+            new ExecutorDetails(2, 2),
+            new ExecutorDetails(6, 6))));
+        expectedScheduling.add(new HashSet<>(Arrays.asList(
+            new ExecutorDetails(4, 4),
+            new ExecutorDetails(1, 1))));
+        expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(5, 5))));
+        expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(3, 3))));
+        HashSet<HashSet<ExecutorDetails>> foundScheduling = new HashSet<>();
+        SchedulerAssignment assignment = cluster.getAssignmentById("testTopology-id");
+        for (Collection<ExecutorDetails> execs : assignment.getSlotToExecutors().values()) {
+            foundScheduling.add(new HashSet<>(execs));
+        }
+
+        assertEquals(expectedScheduling, foundScheduling);
+    }
+
     @Test
     public void testAntiAffinityWithMultipleTopologies() {
         INimbus iNimbus = new INimbusTest();