You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2019/04/03 15:55:28 UTC
[storm] branch master updated: STORM-3366 allow configuring workers
to have a minimum cpu percentage
This is an automated email from the ASF dual-hosted git repository.
kishorvpatil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new ecca56d STORM-3366 allow configuring workers to have a minimum cpu percentage
new 2947fdb Merge pull request #2985 from agresch/agresch_STORM-3366
ecca56d is described below
commit ecca56d770ff8ec0ba514758ae5b908e1b5d65df
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Mon Apr 1 12:36:15 2019 -0500
STORM-3366 allow configuring workers to have a minimum cpu percentage
---
conf/defaults.yaml | 2 +
.../main/java/org/apache/storm/DaemonConfig.java | 11 +++
.../java/org/apache/storm/scheduler/Cluster.java | 42 +++++++++--
.../resource/TestResourceAwareScheduler.java | 83 ++++++++++++++++++++++
4 files changed, 134 insertions(+), 4 deletions(-)
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 548f14c..db255d8 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -389,6 +389,8 @@ storm.supervisor.low.memory.threshold.mb: 1024
storm.supervisor.medium.memory.threshold.mb: 1536
storm.supervisor.medium.memory.grace.period.ms: 30000
+storm.worker.min.cpu.pcore.percent: 0.0
+
storm.topology.classpath.beginning.enabled: false
worker.metrics:
"CGroupMemory": "org.apache.storm.metric.cgroup.CGroupMemoryUsage"
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 8015c1d..addf28c 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -1126,6 +1126,17 @@ public class DaemonConfig implements Validated {
public static String STORM_SUPERVISOR_MEDIUM_MEMORY_GRACE_PERIOD_MS =
"storm.supervisor.medium.memory.grace.period.ms";
+ /**
+ * The config indicates the minimum percentage of cpu for a core that a worker will use. Assuming the a core value to be
+ * 100, a value of 10 indicates 10% of the core. The P in PCORE represents the term "physical". A default value will be set for this
+ * config if user does not override.
+ * <P></P>
+ * Workers in containers or cgroups may require a minimum amount of CPU in order to launch within the supervisor timeout.
+ * This setting allows configuring this to occur.
+ */
+ @isPositiveNumber(includeZero = true)
+ public static String STORM_WORKER_MIN_CPU_PCORE_PERCENT = "storm.worker.min.cpu.pcore.percent";
+
// VALIDATION ONLY CONFIGS
// Some configs inside Config.java may reference classes we don't want to expose in storm-client, but we still want to validate
// That they reference a valid class. To allow this to happen we do part of the validation on the client side with annotations on
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index b8bc69a..46be6ca 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -31,6 +31,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.storm.Config;
import org.apache.storm.Constants;
+import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.nimbus.TopologyResources;
import org.apache.storm.generated.SharedMemory;
import org.apache.storm.generated.WorkerResources;
@@ -85,6 +86,7 @@ public class Cluster implements ISchedulingState {
private SchedulerAssignmentImpl assignment;
private Set<String> blackListedHosts = new HashSet<>();
private INimbus inimbus;
+ private double minWorkerCpu = 0.0;
public Cluster(
INimbus nimbus,
@@ -157,6 +159,7 @@ public class Cluster implements ISchedulingState {
}
this.conf = conf;
this.topologies = topologies;
+ this.minWorkerCpu = ObjectReader.getDouble(conf.get(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT), 0.0);
ArrayList<String> supervisorHostNames = new ArrayList<>();
for (SupervisorDetails s : supervisors.values()) {
@@ -473,7 +476,7 @@ public class Cluster implements ISchedulingState {
for (SchedulerAssignment assignment: assignments.values()) {
for (Entry<WorkerSlot, WorkerResources> entry: assignment.getScheduledResources().entrySet()) {
if (sd.getId().equals(entry.getKey().getNodeId())) {
- ret.remove(entry.getValue(), getResourceMetrics());
+ ret.remove(entry.getValue(), getResourceMetrics());
}
}
}
@@ -513,11 +516,19 @@ public class Cluster implements ISchedulingState {
);
}
sharedTotalResources = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(sharedTotalResources);
+
+ Map<String, Double> totalResourcesMap = totalResources.toNormalizedMap();
+ Double cpu = totalResources.getTotalCpu();
+ if (cpu < minWorkerCpu) {
+ cpu = minWorkerCpu;
+ totalResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+ }
+
WorkerResources ret = new WorkerResources();
- ret.set_resources(totalResources.toNormalizedMap());
+ ret.set_resources(totalResourcesMap);
ret.set_shared_resources(sharedTotalResources);
+ ret.set_cpu(cpu);
- ret.set_cpu(totalResources.getTotalCpu());
ret.set_mem_off_heap(totalResources.getOffHeapMemoryMb());
ret.set_mem_on_heap(totalResources.getOnHeapMemoryMb());
ret.set_shared_mem_off_heap(
@@ -548,6 +559,9 @@ public class Cluster implements ISchedulingState {
double afterTotal = 0.0;
double afterOnHeap = 0.0;
+ double currentCpuTotal = 0.0;
+ double afterCpuTotal = 0.0;
+
Set<ExecutorDetails> wouldBeAssigned = new HashSet<>();
wouldBeAssigned.add(exec);
SchedulerAssignmentImpl assignment = assignments.get(td.getId());
@@ -558,6 +572,7 @@ public class Cluster implements ISchedulingState {
wouldBeAssigned.addAll(currentlyAssigned);
WorkerResources wrCurrent = calculateWorkerResources(td, currentlyAssigned);
currentTotal = wrCurrent.get_mem_off_heap() + wrCurrent.get_mem_on_heap();
+ currentCpuTotal = wrCurrent.get_cpu();
}
WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
@@ -565,6 +580,25 @@ public class Cluster implements ISchedulingState {
currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment);
afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment, exec);
+ afterCpuTotal = wrAfter.get_cpu();
+ } else {
+ WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
+ afterCpuTotal = wrAfter.get_cpu();
+ }
+
+ double cpuAdded = afterCpuTotal - currentCpuTotal;
+ double cpuAvailable = resourcesAvailable.getTotalCpu();
+
+ if (cpuAdded > cpuAvailable) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}",
+ td.getName(),
+ exec,
+ ws,
+ cpuAdded,
+ cpuAvailable);
+ }
+ return false;
}
double memoryAdded = afterTotal - currentTotal;
@@ -967,7 +1001,7 @@ public class Cluster implements ISchedulingState {
}
/**
- * This medhod updates ScheduledResources and UsedSlots cache for given workerSlot.
+ * This method updates ScheduledResources and UsedSlots cache for given workerSlot.
*/
private void updateCachesForWorkerSlot(WorkerSlot workerSlot, WorkerResources workerResources, Double sharedoffHeapMemory) {
String nodeId = workerSlot.getNodeId();
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 592a3b6..99da7b0 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -945,6 +945,7 @@ public class TestResourceAwareScheduler {
scheduler.prepare(config);
scheduler.schedule(topologies, cluster);
+ assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null);
assertTrue("Topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
assertEquals("Topo-2 all executors scheduled?", 4, cluster.getAssignmentById(topo2.getId()).getExecutorToSlot().size());
assertTrue("Topo-3 scheduled?", cluster.getAssignmentById(topo3.getId()) != null);
@@ -952,6 +953,88 @@ public class TestResourceAwareScheduler {
}
/**
+ * Min CPU for worker set to 50%. 1 supervisor with 100% CPU.
+ * A topology with 10 10% components should schedule.
+ */
+ @Test
+ public void minCpuWorkerJustFits() {
+ INimbus iNimbus = new INimbusTest();
+ Map<String, SupervisorDetails> supMap = genSupervisors(1, 4, 100, 60000);
+ Config config = createClusterConfig(10, 500, 500, null);
+ config.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, 50.0);
+ TopologyDetails topo1 = genTopology("topo-1", config, 10, 0, 1, 1, currentTime - 2, 20, "jerry");
+ Topologies topologies = new Topologies(topo1);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+ scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config);
+ scheduler.schedule(topologies, cluster);
+ assertTrue("Topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
+ }
+
+ /**
+ * Min CPU for worker set to 40%. 1 supervisor with 100% CPU.
+ * 2 topologies with 2 10% components should schedule. A third topology should then fail scheduling due to lack of CPU.
+ */
+ @Test
+ public void minCpuPreventsThirdTopo() {
+ INimbus iNimbus = new INimbusTest();
+ Map<String, SupervisorDetails> supMap = genSupervisors(1, 4, 100, 60000);
+ Config config = createClusterConfig(10, 500, 500, null);
+ config.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, 40.0);
+ TopologyDetails topo1 = genTopology("topo-1", config, 2, 0, 1, 1, currentTime - 2, 20, "jerry");
+ TopologyDetails topo2 = genTopology("topo-2", config, 2, 0, 1, 1, currentTime - 2, 20, "jerry");
+ TopologyDetails topo3 = genTopology("topo-3", config, 2, 0, 1, 1, currentTime - 2, 20, "jerry");
+ Topologies topologies = new Topologies(topo1, topo2, topo3);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+ scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config);
+ scheduler.schedule(topologies, cluster);
+ assertTrue("topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
+ assertTrue("topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
+ assertFalse("topo-3 unscheduled?", cluster.getAssignmentById(topo3.getId()) != null);
+
+ SchedulerAssignment assignment1 = cluster.getAssignmentById(topo1.getId());
+ assertEquals(1, assignment1.getSlots().size());
+ Map<WorkerSlot, WorkerResources> assignedSlots1 = assignment1.getScheduledResources();
+ double assignedCpu = 0.0;
+ for (Entry<WorkerSlot, WorkerResources> entry : assignedSlots1.entrySet()) {
+ WorkerResources wr = entry.getValue();
+ assignedCpu += wr.get_cpu();
+ }
+ assertEquals(40.0, assignedCpu, 0.001);
+
+ SchedulerAssignment assignment2 = cluster.getAssignmentById(topo2.getId());
+ assertEquals(1, assignment2.getSlots().size());
+ Map<WorkerSlot, WorkerResources> assignedSlots2 = assignment2.getScheduledResources();
+ assignedCpu = 0.0;
+ for (Entry<WorkerSlot, WorkerResources> entry : assignedSlots2.entrySet()) {
+ WorkerResources wr = entry.getValue();
+ assignedCpu += wr.get_cpu();
+ }
+ assertEquals(40.0, assignedCpu, 0.001);
+ }
+
+ /**
+ * Min CPU for worker set to 50%. 1 supervisor with 100% CPU.
+ * A topology with 3 workers should fail scheduling even if under CPU.
+ */
+ @Test
+ public void minCpuWorkerSplitFails() {
+ INimbus iNimbus = new INimbusTest();
+ Map<String, SupervisorDetails> supMap = genSupervisors(1, 4, 100, 60000);
+ Config config = createClusterConfig(10, 500, 500, null);
+ config.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, 50.0);
+ TopologyDetails topo1 = genTopology("topo-1", config, 10, 0, 1, 1, currentTime - 2, 20,
+ "jerry", 2000.0);
+ Topologies topologies = new Topologies(topo1);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+ scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config);
+ scheduler.schedule(topologies, cluster);
+ assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null);
+ }
+
+ /**
* Test multiple spouts and cyclic topologies
*/
@Test