You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2019/09/05 21:14:36 UTC

[storm] branch master updated: STORM-3482 Implement One Worker Per Component Option (#3113)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new c427119  STORM-3482 Implement One Worker Per Component Option (#3113)
c427119 is described below

commit c427119f24bc0b14f81706ab4ad03404aa85aede
Author: dandsager1 <50...@users.noreply.github.com>
AuthorDate: Thu Sep 5 16:14:31 2019 -0500

    STORM-3482 Implement One Worker Per Component Option (#3113)
---
 conf/defaults.yaml                                 |  1 +
 storm-client/src/jvm/org/apache/storm/Config.java  | 10 +++
 .../apache/storm/scheduler/TopologyDetails.java    |  4 ++
 .../apache/storm/scheduler/resource/RasNode.java   | 44 ++++++++++---
 .../scheduler/resource/ResourceAwareScheduler.java |  9 +++
 .../scheduling/BaseResourceAwareStrategy.java      | 12 +---
 .../scheduling/DefaultResourceAwareStrategy.java   |  3 -
 .../TestDefaultResourceAwareStrategy.java          | 73 +++++++++++++++-------
 8 files changed, 113 insertions(+), 43 deletions(-)

diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e555ac8..43a623b 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -337,6 +337,7 @@ resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource
 topology.ras.constraint.max.state.search: 10_000     # The maximum number of states that will be searched looking for a solution in the constraint solver strategy
 resource.aware.scheduler.constraint.max.state.search: 100_000 # Daemon limit on maximum number of states that will be searched looking for a solution in the constraint solver strategy
 topology.ras.one.executor.per.worker: false
+topology.ras.one.component.per.worker: false
 
 blacklist.scheduler.tolerance.time.secs: 300
 blacklist.scheduler.tolerance.count: 3
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index b46c112..434ec9c 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -318,10 +318,20 @@ public class Config extends HashMap<String, Object> {
     /**
      * Whether to limit each worker to one executor. This is useful for debugging topologies to clearly identify workers that
      * are slow/crashing and for estimating resource requirements and capacity.
+     * If both {@link #TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER} and {@link #TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER} are enabled,
+     * {@link #TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER} is ignored.
      */
     @IsBoolean
     public static final String TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER = "topology.ras.one.executor.per.worker";
     /**
+     * Whether to limit each worker to one component. This is useful for debugging topologies to clearly identify workers that
+     * are slow/crashing and for estimating resource requirements and capacity.
+     * If both TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER and TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER are enabled,
+     * TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER is ignored.
+     */
+    @IsBoolean
+    public static final String TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER = "topology.ras.one.component.per.worker";
+    /**
      * The maximum number of seconds to spend scheduling a topology using the constraint solver.  Null means no limit.
      */
     @IsInteger
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index 660f3d8..9b8298e 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -244,6 +244,10 @@ public class TopologyDetails {
         return ret;
     }
 
+    public String getComponentFromExecutor(ExecutorDetails exec) {
+        return executorToComponent.get(exec);
+    }
+
     /**
      * Gets the on heap memory requirement for a certain task within a topology.
      *
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java
index 9d12428..2050bc3 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java
@@ -26,6 +26,8 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+
+import org.apache.storm.Config;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.SupervisorDetails;
@@ -363,14 +365,40 @@ public class RasNode {
      */
     public boolean wouldFit(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
         assert nodeId.equals(ws.getNodeId()) : "Slot " + ws + " is not a part of this node " + nodeId;
-        return isAlive
-               && cluster.wouldFit(
-            ws,
-            exec,
-            td,
-            getTotalAvailableResources(),
-            td.getTopologyWorkerMaxHeapSize()
-        );
+        if (!isAlive || !cluster.wouldFit(
+                ws,
+                exec,
+                td,
+                getTotalAvailableResources(),
+                td.getTopologyWorkerMaxHeapSize())) {
+            return false;
+        }
+
+        boolean oneExecutorPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER);
+        boolean oneComponentPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER);
+
+        if (oneExecutorPerWorker) {
+            return !getUsedSlots(td.getId()).contains(ws);
+        }
+
+        if (oneComponentPerWorker) {
+            Set<String> components = new HashSet<>();
+            Map<String, Collection<ExecutorDetails>> topologyExecutors = topIdToUsedSlots.get(td.getId());
+            if (topologyExecutors != null) {
+                Collection<ExecutorDetails> slotExecs = topologyExecutors.get(ws.getId());
+                if (slotExecs != null) {
+                    // components from WorkerSlot
+                    for (ExecutorDetails slotExec : slotExecs) {
+                        components.add(td.getComponentFromExecutor(slotExec));
+                    }
+                    // component from exec
+                    components.add(td.getComponentFromExecutor(exec));
+                }
+            }
+            return components.size() <= 1;
+        }
+
+        return true;
     }
 
     /**
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 43ef699..ff5f526 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -152,6 +152,15 @@ public class ResourceAwareScheduler implements IScheduler {
             return;
         }
 
+        // Log warning here to avoid duplicating / spamming in strategy / scheduling code.
+        boolean oneExecutorPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER);
+        boolean oneComponentPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER);
+        if (oneExecutorPerWorker && oneComponentPerWorker) {
+            LOG.warn("Conflicting options: {} and {} are both set! Ignoring {} option.",
+                Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER,
+                Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER);
+        }
+
         final IStrategy finalRasStrategy = rasStrategy;
         for (int i = 0; i < maxSchedulingAttempts; i++) {
             SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 55279b8..4e5ec5c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory;
 public abstract class BaseResourceAwareStrategy implements IStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(BaseResourceAwareStrategy.class);
     protected Cluster cluster;
-    private boolean oneExecutorPerWorker = false;
     // Rack id to list of host names in that rack
     private Map<String, List<String>> networkTopography;
     private final Map<String, String> superIdToRack = new HashMap<>();
@@ -91,10 +90,6 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
         //NOOP
     }
 
-    protected void setOneExecutorPerWorker(boolean oneExecutorPerWorker) {
-        this.oneExecutorPerWorker = oneExecutorPerWorker;
-    }
-
     protected SchedulingResult mkNotEnoughResources(TopologyDetails td) {
         return  SchedulingResult.failure(
             SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
@@ -152,12 +147,9 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
         for (String id : sortedNodes) {
             RasNode node = nodes.getNodeById(id);
             if (node.couldEverFit(exec, td)) {
-                Collection<WorkerSlot> topologyUsedSlots = oneExecutorPerWorker ? node.getUsedSlots(td.getId()) : Collections.emptySet();
                 for (WorkerSlot ws : node.getSlotsAvailableToScheduleOn()) {
-                    if (!topologyUsedSlots.contains(ws)) {
-                        if (node.wouldFit(ws, exec, td)) {
-                            return ws;
-                        }
+                    if (node.wouldFit(ws, exec, td)) {
+                        return ws;
                     }
                 }
             }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 9b2a7bd..6c3c1f7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -39,9 +39,6 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl
 
     @Override
     public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
-        boolean oneExecutorPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER);
-        setOneExecutorPerWorker(oneExecutorPerWorker);
-
         prepare(cluster);
         if (nodes.getNodes().size() <= 0) {
             LOG.warn("No available nodes to schedule tasks on!");
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 31d8ecb..30f2bb7 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -49,7 +49,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
-import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,6 +82,11 @@ public class TestDefaultResourceAwareStrategy {
         SHARED_OFF_HEAP_WORKER,
         SHARED_ON_HEAP_WORKER
     };
+    private enum WorkerRestrictionType {
+        WORKER_RESTRICTION_ONE_EXECUTOR,
+        WORKER_RESTRICTION_ONE_COMPONENT,
+        WORKER_RESTRICTION_NONE
+    };
 
     private static class TestDNSToSwitchMapping implements DNSToSwitchMapping {
         private final Map<String, String> result;
@@ -274,8 +278,8 @@ public class TestDefaultResourceAwareStrategy {
      * test if the scheduling shared memory is correct with/without oneExecutorPerWorker enabled
      */
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testDefaultResourceAwareStrategySharedMemory(boolean oneExecutorPerWorker) {
+    @EnumSource(WorkerRestrictionType.class)
+    public void testDefaultResourceAwareStrategySharedMemory(WorkerRestrictionType schedulingLimitation) {
         int spoutParallelism = 2;
         int boltParallelism = 2;
         int numBolts = 3;
@@ -305,7 +309,14 @@ public class TestDefaultResourceAwareStrategy {
         conf.put(Config.TOPOLOGY_PRIORITY, 0);
         conf.put(Config.TOPOLOGY_NAME, "testTopology");
         conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
-        conf.put(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, oneExecutorPerWorker);
+        switch (schedulingLimitation) {
+            case WORKER_RESTRICTION_ONE_EXECUTOR:
+                conf.put(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, true);
+                break;
+            case WORKER_RESTRICTION_ONE_COMPONENT:
+                conf.put(Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER, true);
+                break;
+        }
         TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
                 genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
 
@@ -316,17 +327,23 @@ public class TestDefaultResourceAwareStrategy {
         scheduler.prepare(conf);
         scheduler.schedule(topologies, cluster);
 
-        // one worker per executor scheduling
         // [3,3] [7,7], [0,0] [2,2] [6,6] [1,1] [5,5] [4,4] sorted executor ordering
         // spout  [0,0] [1,1]
         // bolt-1 [2,2] [3,3]
         // bolt-2 [6,6] [7,7]
         // bolt-3 [4,4] [5,5]
-        //
-        // expect 8 workers over 2 nodes
+
+        // WorkerRestrictionType.WORKER_RESTRICTION_NONE
+        // expect 1 worker, 1 node
+
+        // WorkerRestrictionType.WORKER_RESTRICTION_ONE_EXECUTOR
+        // expect 8 workers, 2 nodes
         // node r000s000 workers: bolt-1 bolt-2 spout bolt-1 (no memory sharing)
         // node r000s001 workers: bolt-2 spout bolt-3 bolt-3 (no memory sharing)
 
+        // WorkerRestrictionType.WORKER_RESTRICTION_ONE_COMPONENT
+        // expect 4 workers, 1 node
+
         for (Entry<String, SupervisorResources> entry: cluster.getSupervisorsResourcesMap().entrySet()) {
             String supervisorId = entry.getKey();
             SupervisorResources resources = entry.getValue();
@@ -334,14 +351,17 @@ public class TestDefaultResourceAwareStrategy {
             assertTrue(supervisorId, resources.getTotalMem() >= resources.getUsedMem());
         }
 
-        if (!oneExecutorPerWorker) {
+        int totalNumberOfTasks = spoutParallelism + boltParallelism * numBolts;
+        SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
+        TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
+        long numNodes = assignment.getSlotToExecutors().keySet().stream().map(WorkerSlot::getNodeId).distinct().count();
+
+        if (schedulingLimitation == WorkerRestrictionType.WORKER_RESTRICTION_NONE) {
             // Everything should fit in a single slot
-            int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
             double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
             double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeapWithinWorker;
             double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWithinWorker;
 
-            SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
             assertThat(assignment.getSlots().size(), is(1));
             WorkerSlot ws = assignment.getSlots().iterator().next();
             String nodeId = ws.getNodeId();
@@ -354,12 +374,7 @@ public class TestDefaultResourceAwareStrategy {
             assertThat(resources.get_mem_off_heap(), closeTo(totalExpectedWorkerOffHeap, 0.01));
             assertThat(resources.get_shared_mem_on_heap(), closeTo(sharedOnHeapWithinWorker, 0.01));
             assertThat(resources.get_shared_mem_off_heap(), closeTo(sharedOffHeapWithinWorker, 0.01));
-        } else {
-            // one worker per executor
-            int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
-            TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
-
-            // get expected mem on topology rather than per executor
+        } else if (schedulingLimitation == WorkerRestrictionType.WORKER_RESTRICTION_ONE_EXECUTOR) {
             double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) + 2 * sharedOnHeapWithinWorker;
             double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) + 2 * sharedOffHeapWithinWorker + 2 * sharedOffHeapWithinNode;
             double expectedMemSharedOnHeap = 2 * sharedOnHeapWithinWorker;
@@ -375,16 +390,30 @@ public class TestDefaultResourceAwareStrategy {
 
             double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
             assertThat(topologyResources.getAssignedCpu(), closeTo(totalExpectedCPU, 0.01));
-
-            // expect 8 workers
-            SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
             int numAssignedWorkers = cluster.getAssignedNumWorkers(topo);
             assertThat(numAssignedWorkers, is(8));
             assertThat(assignment.getSlots().size(), is(8));
-
-            // expect 2 nodes
-            long numNodes = assignment.getSlotToExecutors().keySet().stream().map(ws -> ws.getNodeId()).distinct().count();
             assertThat(numNodes, is(2L));
+        } else if (schedulingLimitation == WorkerRestrictionType.WORKER_RESTRICTION_ONE_COMPONENT) {
+            double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeapWithinWorker;
+            double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWithinWorker + sharedOffHeapWithinNode;
+            double expectedMemSharedOnHeap = sharedOnHeapWithinWorker;
+            double expectedMemSharedOffHeap = sharedOffHeapWithinWorker + sharedOffHeapWithinNode;
+            double expectedMemNonSharedOnHeap = totalNumberOfTasks * memoryOnHeap;
+            double expectedMemNonSharedOffHeap = totalNumberOfTasks * memoryOffHeap;
+            assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(expectedMemOnHeap, 0.01));
+            assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(expectedMemOffHeap, 0.01));
+            assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(expectedMemSharedOnHeap, 0.01));
+            assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(expectedMemSharedOffHeap, 0.01));
+            assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(expectedMemNonSharedOnHeap, 0.01));
+            assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(expectedMemNonSharedOffHeap, 0.01));
+
+            double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
+            assertThat(topologyResources.getAssignedCpu(), closeTo(totalExpectedCPU, 0.01));
+            int numAssignedWorkers = cluster.getAssignedNumWorkers(topo);
+            assertThat(numAssignedWorkers, is(4));
+            assertThat(assignment.getSlots().size(), is(4));
+            assertThat(numNodes, is(1L));
         }
     }