You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2019/09/05 21:14:36 UTC
[storm] branch master updated: STORM-3482 Implement One Worker Per
Component Option (#3113)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new c427119 STORM-3482 Implement One Worker Per Component Option (#3113)
c427119 is described below
commit c427119f24bc0b14f81706ab4ad03404aa85aede
Author: dandsager1 <50...@users.noreply.github.com>
AuthorDate: Thu Sep 5 16:14:31 2019 -0500
STORM-3482 Implement One Worker Per Component Option (#3113)
---
conf/defaults.yaml | 1 +
storm-client/src/jvm/org/apache/storm/Config.java | 10 +++
.../apache/storm/scheduler/TopologyDetails.java | 4 ++
.../apache/storm/scheduler/resource/RasNode.java | 44 ++++++++++---
.../scheduler/resource/ResourceAwareScheduler.java | 9 +++
.../scheduling/BaseResourceAwareStrategy.java | 12 +---
.../scheduling/DefaultResourceAwareStrategy.java | 3 -
.../TestDefaultResourceAwareStrategy.java | 73 +++++++++++++++-------
8 files changed, 113 insertions(+), 43 deletions(-)
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e555ac8..43a623b 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -337,6 +337,7 @@ resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource
topology.ras.constraint.max.state.search: 10_000 # The maximum number of states that will be searched looking for a solution in the constraint solver strategy
resource.aware.scheduler.constraint.max.state.search: 100_000 # Daemon limit on maximum number of states that will be searched looking for a solution in the constraint solver strategy
topology.ras.one.executor.per.worker: false
+topology.ras.one.component.per.worker: false
blacklist.scheduler.tolerance.time.secs: 300
blacklist.scheduler.tolerance.count: 3
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index b46c112..434ec9c 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -318,10 +318,20 @@ public class Config extends HashMap<String, Object> {
/**
* Whether to limit each worker to one executor. This is useful for debugging topologies to clearly identify workers that
* are slow/crashing and for estimating resource requirements and capacity.
+ * If both {@link #TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER} and {@link #TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER} are enabled,
+ * {@link #TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER} is ignored.
*/
@IsBoolean
public static final String TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER = "topology.ras.one.executor.per.worker";
/**
+ * Whether to limit each worker to one component. This is useful for debugging topologies to clearly identify workers that
+ * are slow/crashing and for estimating resource requirements and capacity.
+ * If both TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER and TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER are enabled,
+ * TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER is ignored.
+ */
+ @IsBoolean
+ public static final String TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER = "topology.ras.one.component.per.worker";
+ /**
* The maximum number of seconds to spend scheduling a topology using the constraint solver. Null means no limit.
*/
@IsInteger
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index 660f3d8..9b8298e 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -244,6 +244,10 @@ public class TopologyDetails {
return ret;
}
+ public String getComponentFromExecutor(ExecutorDetails exec) {
+ return executorToComponent.get(exec);
+ }
+
/**
* Gets the on heap memory requirement for a certain task within a topology.
*
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java
index 9d12428..2050bc3 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RasNode.java
@@ -26,6 +26,8 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+
+import org.apache.storm.Config;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SupervisorDetails;
@@ -363,14 +365,40 @@ public class RasNode {
*/
public boolean wouldFit(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
assert nodeId.equals(ws.getNodeId()) : "Slot " + ws + " is not a part of this node " + nodeId;
- return isAlive
- && cluster.wouldFit(
- ws,
- exec,
- td,
- getTotalAvailableResources(),
- td.getTopologyWorkerMaxHeapSize()
- );
+ if (!isAlive || !cluster.wouldFit(
+ ws,
+ exec,
+ td,
+ getTotalAvailableResources(),
+ td.getTopologyWorkerMaxHeapSize())) {
+ return false;
+ }
+
+ boolean oneExecutorPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER);
+ boolean oneComponentPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER);
+
+ if (oneExecutorPerWorker) {
+ return !getUsedSlots(td.getId()).contains(ws);
+ }
+
+ if (oneComponentPerWorker) {
+ Set<String> components = new HashSet<>();
+ Map<String, Collection<ExecutorDetails>> topologyExecutors = topIdToUsedSlots.get(td.getId());
+ if (topologyExecutors != null) {
+ Collection<ExecutorDetails> slotExecs = topologyExecutors.get(ws.getId());
+ if (slotExecs != null) {
+ // components from WorkerSlot
+ for (ExecutorDetails slotExec : slotExecs) {
+ components.add(td.getComponentFromExecutor(slotExec));
+ }
+ // component from exec
+ components.add(td.getComponentFromExecutor(exec));
+ }
+ }
+ return components.size() <= 1;
+ }
+
+ return true;
}
/**
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 43ef699..ff5f526 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -152,6 +152,15 @@ public class ResourceAwareScheduler implements IScheduler {
return;
}
+ // Log warning here to avoid duplicating / spamming in strategy / scheduling code.
+ boolean oneExecutorPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER);
+ boolean oneComponentPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER);
+ if (oneExecutorPerWorker && oneComponentPerWorker) {
+ LOG.warn("Conflicting options: {} and {} are both set! Ignoring {} option.",
+ Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER,
+ Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER);
+ }
+
final IStrategy finalRasStrategy = rasStrategy;
for (int i = 0; i < maxSchedulingAttempts; i++) {
SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId());
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 55279b8..4e5ec5c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory;
public abstract class BaseResourceAwareStrategy implements IStrategy {
private static final Logger LOG = LoggerFactory.getLogger(BaseResourceAwareStrategy.class);
protected Cluster cluster;
- private boolean oneExecutorPerWorker = false;
// Rack id to list of host names in that rack
private Map<String, List<String>> networkTopography;
private final Map<String, String> superIdToRack = new HashMap<>();
@@ -91,10 +90,6 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
//NOOP
}
- protected void setOneExecutorPerWorker(boolean oneExecutorPerWorker) {
- this.oneExecutorPerWorker = oneExecutorPerWorker;
- }
-
protected SchedulingResult mkNotEnoughResources(TopologyDetails td) {
return SchedulingResult.failure(
SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
@@ -152,12 +147,9 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
for (String id : sortedNodes) {
RasNode node = nodes.getNodeById(id);
if (node.couldEverFit(exec, td)) {
- Collection<WorkerSlot> topologyUsedSlots = oneExecutorPerWorker ? node.getUsedSlots(td.getId()) : Collections.emptySet();
for (WorkerSlot ws : node.getSlotsAvailableToScheduleOn()) {
- if (!topologyUsedSlots.contains(ws)) {
- if (node.wouldFit(ws, exec, td)) {
- return ws;
- }
+ if (node.wouldFit(ws, exec, td)) {
+ return ws;
}
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 9b2a7bd..6c3c1f7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -39,9 +39,6 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl
@Override
public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
- boolean oneExecutorPerWorker = (Boolean) td.getConf().get(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER);
- setOneExecutorPerWorker(oneExecutorPerWorker);
-
prepare(cluster);
if (nodes.getNodes().size() <= 0) {
LOG.warn("No available nodes to schedule tasks on!");
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 31d8ecb..30f2bb7 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -49,7 +49,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
-import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +82,11 @@ public class TestDefaultResourceAwareStrategy {
SHARED_OFF_HEAP_WORKER,
SHARED_ON_HEAP_WORKER
};
+ private enum WorkerRestrictionType {
+ WORKER_RESTRICTION_ONE_EXECUTOR,
+ WORKER_RESTRICTION_ONE_COMPONENT,
+ WORKER_RESTRICTION_NONE
+ };
private static class TestDNSToSwitchMapping implements DNSToSwitchMapping {
private final Map<String, String> result;
@@ -274,8 +278,8 @@ public class TestDefaultResourceAwareStrategy {
* test if the scheduling shared memory is correct with/without oneExecutorPerWorker enabled
*/
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testDefaultResourceAwareStrategySharedMemory(boolean oneExecutorPerWorker) {
+ @EnumSource(WorkerRestrictionType.class)
+ public void testDefaultResourceAwareStrategySharedMemory(WorkerRestrictionType schedulingLimitation) {
int spoutParallelism = 2;
int boltParallelism = 2;
int numBolts = 3;
@@ -305,7 +309,14 @@ public class TestDefaultResourceAwareStrategy {
conf.put(Config.TOPOLOGY_PRIORITY, 0);
conf.put(Config.TOPOLOGY_NAME, "testTopology");
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
- conf.put(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, oneExecutorPerWorker);
+ switch (schedulingLimitation) {
+ case WORKER_RESTRICTION_ONE_EXECUTOR:
+ conf.put(Config.TOPOLOGY_RAS_ONE_EXECUTOR_PER_WORKER, true);
+ break;
+ case WORKER_RESTRICTION_ONE_COMPONENT:
+ conf.put(Config.TOPOLOGY_RAS_ONE_COMPONENT_PER_WORKER, true);
+ break;
+ }
TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
@@ -316,17 +327,23 @@ public class TestDefaultResourceAwareStrategy {
scheduler.prepare(conf);
scheduler.schedule(topologies, cluster);
- // one worker per executor scheduling
// [3,3] [7,7], [0,0] [2,2] [6,6] [1,1] [5,5] [4,4] sorted executor ordering
// spout [0,0] [1,1]
// bolt-1 [2,2] [3,3]
// bolt-2 [6,6] [7,7]
// bolt-3 [4,4] [5,5]
- //
- // expect 8 workers over 2 nodes
+
+ // WorkerRestrictionType.WORKER_RESTRICTION_NONE
+ // expect 1 worker, 1 node
+
+ // WorkerRestrictionType.WORKER_RESTRICTION_ONE_EXECUTOR
+ // expect 8 workers, 2 nodes
// node r000s000 workers: bolt-1 bolt-2 spout bolt-1 (no memory sharing)
// node r000s001 workers: bolt-2 spout bolt-3 bolt-3 (no memory sharing)
+ // WorkerRestrictionType.WORKER_RESTRICTION_ONE_COMPONENT
+ // expect 4 workers, 1 node
+
for (Entry<String, SupervisorResources> entry: cluster.getSupervisorsResourcesMap().entrySet()) {
String supervisorId = entry.getKey();
SupervisorResources resources = entry.getValue();
@@ -334,14 +351,17 @@ public class TestDefaultResourceAwareStrategy {
assertTrue(supervisorId, resources.getTotalMem() >= resources.getUsedMem());
}
- if (!oneExecutorPerWorker) {
+ int totalNumberOfTasks = spoutParallelism + boltParallelism * numBolts;
+ SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
+ TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
+ long numNodes = assignment.getSlotToExecutors().keySet().stream().map(WorkerSlot::getNodeId).distinct().count();
+
+ if (schedulingLimitation == WorkerRestrictionType.WORKER_RESTRICTION_NONE) {
// Everything should fit in a single slot
- int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeapWithinWorker;
double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWithinWorker;
- SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
assertThat(assignment.getSlots().size(), is(1));
WorkerSlot ws = assignment.getSlots().iterator().next();
String nodeId = ws.getNodeId();
@@ -354,12 +374,7 @@ public class TestDefaultResourceAwareStrategy {
assertThat(resources.get_mem_off_heap(), closeTo(totalExpectedWorkerOffHeap, 0.01));
assertThat(resources.get_shared_mem_on_heap(), closeTo(sharedOnHeapWithinWorker, 0.01));
assertThat(resources.get_shared_mem_off_heap(), closeTo(sharedOffHeapWithinWorker, 0.01));
- } else {
- // one worker per executor
- int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
- TopologyResources topologyResources = cluster.getTopologyResourcesMap().get(topo.getId());
-
- // get expected mem on topology rather than per executor
+ } else if (schedulingLimitation == WorkerRestrictionType.WORKER_RESTRICTION_ONE_EXECUTOR) {
double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) + 2 * sharedOnHeapWithinWorker;
double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) + 2 * sharedOffHeapWithinWorker + 2 * sharedOffHeapWithinNode;
double expectedMemSharedOnHeap = 2 * sharedOnHeapWithinWorker;
@@ -375,16 +390,30 @@ public class TestDefaultResourceAwareStrategy {
double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
assertThat(topologyResources.getAssignedCpu(), closeTo(totalExpectedCPU, 0.01));
-
- // expect 8 workers
- SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
int numAssignedWorkers = cluster.getAssignedNumWorkers(topo);
assertThat(numAssignedWorkers, is(8));
assertThat(assignment.getSlots().size(), is(8));
-
- // expect 2 nodes
- long numNodes = assignment.getSlotToExecutors().keySet().stream().map(ws -> ws.getNodeId()).distinct().count();
assertThat(numNodes, is(2L));
+ } else if (schedulingLimitation == WorkerRestrictionType.WORKER_RESTRICTION_ONE_COMPONENT) {
+ double expectedMemOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeapWithinWorker;
+ double expectedMemOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWithinWorker + sharedOffHeapWithinNode;
+ double expectedMemSharedOnHeap = sharedOnHeapWithinWorker;
+ double expectedMemSharedOffHeap = sharedOffHeapWithinWorker + sharedOffHeapWithinNode;
+ double expectedMemNonSharedOnHeap = totalNumberOfTasks * memoryOnHeap;
+ double expectedMemNonSharedOffHeap = totalNumberOfTasks * memoryOffHeap;
+ assertThat(topologyResources.getAssignedMemOnHeap(), closeTo(expectedMemOnHeap, 0.01));
+ assertThat(topologyResources.getAssignedMemOffHeap(), closeTo(expectedMemOffHeap, 0.01));
+ assertThat(topologyResources.getAssignedSharedMemOnHeap(), closeTo(expectedMemSharedOnHeap, 0.01));
+ assertThat(topologyResources.getAssignedSharedMemOffHeap(), closeTo(expectedMemSharedOffHeap, 0.01));
+ assertThat(topologyResources.getAssignedNonSharedMemOnHeap(), closeTo(expectedMemNonSharedOnHeap, 0.01));
+ assertThat(topologyResources.getAssignedNonSharedMemOffHeap(), closeTo(expectedMemNonSharedOffHeap, 0.01));
+
+ double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
+ assertThat(topologyResources.getAssignedCpu(), closeTo(totalExpectedCPU, 0.01));
+ int numAssignedWorkers = cluster.getAssignedNumWorkers(topo);
+ assertThat(numAssignedWorkers, is(4));
+ assertThat(assignment.getSlots().size(), is(4));
+ assertThat(numNodes, is(1L));
}
}