You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2019/04/03 15:55:28 UTC

[storm] branch master updated: STORM-3366 allow configuring workers to have a minimum cpu percentage

This is an automated email from the ASF dual-hosted git repository.

kishorvpatil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new ecca56d  STORM-3366 allow configuring workers to have a minimum cpu percentage
     new 2947fdb  Merge pull request #2985 from agresch/agresch_STORM-3366
ecca56d is described below

commit ecca56d770ff8ec0ba514758ae5b908e1b5d65df
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Mon Apr 1 12:36:15 2019 -0500

    STORM-3366 allow configuring workers to have a minimum cpu percentage
---
 conf/defaults.yaml                                 |  2 +
 .../main/java/org/apache/storm/DaemonConfig.java   | 11 +++
 .../java/org/apache/storm/scheduler/Cluster.java   | 42 +++++++++--
 .../resource/TestResourceAwareScheduler.java       | 83 ++++++++++++++++++++++
 4 files changed, 134 insertions(+), 4 deletions(-)

diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 548f14c..db255d8 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -389,6 +389,8 @@ storm.supervisor.low.memory.threshold.mb: 1024
 storm.supervisor.medium.memory.threshold.mb: 1536
 storm.supervisor.medium.memory.grace.period.ms: 30000
 
+storm.worker.min.cpu.pcore.percent: 0.0
+
 storm.topology.classpath.beginning.enabled: false
 worker.metrics:
     "CGroupMemory": "org.apache.storm.metric.cgroup.CGroupMemoryUsage"
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 8015c1d..addf28c 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -1126,6 +1126,17 @@ public class DaemonConfig implements Validated {
     public static String STORM_SUPERVISOR_MEDIUM_MEMORY_GRACE_PERIOD_MS =
         "storm.supervisor.medium.memory.grace.period.ms";
 
+    /**
+     * The config indicates the minimum percentage of cpu for a core that a worker will use. Assuming the a core value to be
+     * 100, a value of 10 indicates 10% of the core. The P in PCORE represents the term "physical".  A default value will be set for this
+     * config if user does not override.
+     * <P></P>
+     * Workers in containers or cgroups may require a minimum amount of CPU in order to launch within the supervisor timeout.
+     * This setting allows configuring this to occur.
+     */
+    @isPositiveNumber(includeZero = true)
+    public static String STORM_WORKER_MIN_CPU_PCORE_PERCENT = "storm.worker.min.cpu.pcore.percent";
+
     // VALIDATION ONLY CONFIGS
     // Some configs inside Config.java may reference classes we don't want to expose in storm-client, but we still want to validate
     // That they reference a valid class.  To allow this to happen we do part of the validation on the client side with annotations on
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index b8bc69a..46be6ca 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -31,6 +31,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
+import org.apache.storm.DaemonConfig;
 import org.apache.storm.daemon.nimbus.TopologyResources;
 import org.apache.storm.generated.SharedMemory;
 import org.apache.storm.generated.WorkerResources;
@@ -85,6 +86,7 @@ public class Cluster implements ISchedulingState {
     private SchedulerAssignmentImpl assignment;
     private Set<String> blackListedHosts = new HashSet<>();
     private INimbus inimbus;
+    private double minWorkerCpu = 0.0;
 
     public Cluster(
         INimbus nimbus,
@@ -157,6 +159,7 @@ public class Cluster implements ISchedulingState {
         }
         this.conf = conf;
         this.topologies = topologies;
+        this.minWorkerCpu = ObjectReader.getDouble(conf.get(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT), 0.0);
 
         ArrayList<String> supervisorHostNames = new ArrayList<>();
         for (SupervisorDetails s : supervisors.values()) {
@@ -473,7 +476,7 @@ public class Cluster implements ISchedulingState {
         for (SchedulerAssignment assignment: assignments.values()) {
             for (Entry<WorkerSlot, WorkerResources> entry: assignment.getScheduledResources().entrySet()) {
                 if (sd.getId().equals(entry.getKey().getNodeId())) {
-                   ret.remove(entry.getValue(), getResourceMetrics());
+                    ret.remove(entry.getValue(), getResourceMetrics());
                 }
             }
         }
@@ -513,11 +516,19 @@ public class Cluster implements ISchedulingState {
             );
         }
         sharedTotalResources = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(sharedTotalResources);
+
+        Map<String, Double> totalResourcesMap = totalResources.toNormalizedMap();
+        Double cpu = totalResources.getTotalCpu();
+        if (cpu < minWorkerCpu) {
+            cpu = minWorkerCpu;
+            totalResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+        }
+
         WorkerResources ret = new WorkerResources();
-        ret.set_resources(totalResources.toNormalizedMap());
+        ret.set_resources(totalResourcesMap);
         ret.set_shared_resources(sharedTotalResources);
+        ret.set_cpu(cpu);
 
-        ret.set_cpu(totalResources.getTotalCpu());
         ret.set_mem_off_heap(totalResources.getOffHeapMemoryMb());
         ret.set_mem_on_heap(totalResources.getOnHeapMemoryMb());
         ret.set_shared_mem_off_heap(
@@ -548,6 +559,9 @@ public class Cluster implements ISchedulingState {
         double afterTotal = 0.0;
         double afterOnHeap = 0.0;
 
+        double currentCpuTotal = 0.0;
+        double afterCpuTotal = 0.0;
+
         Set<ExecutorDetails> wouldBeAssigned = new HashSet<>();
         wouldBeAssigned.add(exec);
         SchedulerAssignmentImpl assignment = assignments.get(td.getId());
@@ -558,6 +572,7 @@ public class Cluster implements ISchedulingState {
                 wouldBeAssigned.addAll(currentlyAssigned);
                 WorkerResources wrCurrent = calculateWorkerResources(td, currentlyAssigned);
                 currentTotal = wrCurrent.get_mem_off_heap() + wrCurrent.get_mem_on_heap();
+                currentCpuTotal = wrCurrent.get_cpu();
             }
             WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
             afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
@@ -565,6 +580,25 @@ public class Cluster implements ISchedulingState {
 
             currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment);
             afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment, exec);
+            afterCpuTotal = wrAfter.get_cpu();
+        } else {
+            WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
+            afterCpuTotal = wrAfter.get_cpu();
+        }
+
+        double cpuAdded = afterCpuTotal - currentCpuTotal;
+        double cpuAvailable = resourcesAvailable.getTotalCpu();
+
+        if (cpuAdded > cpuAvailable) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}",
+                        td.getName(),
+                        exec,
+                        ws,
+                        cpuAdded,
+                        cpuAvailable);
+            }
+            return false;
         }
 
         double memoryAdded = afterTotal - currentTotal;
@@ -967,7 +1001,7 @@ public class Cluster implements ISchedulingState {
     }
 
     /**
-     * This medhod updates ScheduledResources and UsedSlots cache for given workerSlot.
+     * This method updates ScheduledResources and UsedSlots cache for given workerSlot.
      */
     private void updateCachesForWorkerSlot(WorkerSlot workerSlot, WorkerResources workerResources, Double sharedoffHeapMemory) {
         String nodeId = workerSlot.getNodeId();
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 592a3b6..99da7b0 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -945,6 +945,7 @@ public class TestResourceAwareScheduler {
         scheduler.prepare(config);
         scheduler.schedule(topologies, cluster);
 
+        assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null);
         assertTrue("Topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
         assertEquals("Topo-2 all executors scheduled?", 4, cluster.getAssignmentById(topo2.getId()).getExecutorToSlot().size());
         assertTrue("Topo-3 scheduled?", cluster.getAssignmentById(topo3.getId()) != null);
@@ -952,6 +953,88 @@ public class TestResourceAwareScheduler {
     }
 
     /**
+     * Min CPU for worker set to 50%.  1 supervisor with 100% CPU.
+     * A topology with 10 10% components should schedule.
+     */
+    @Test
+    public void minCpuWorkerJustFits() {
+        INimbus iNimbus = new INimbusTest();
+        Map<String, SupervisorDetails> supMap = genSupervisors(1, 4, 100, 60000);
+        Config config = createClusterConfig(10, 500, 500, null);
+        config.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, 50.0);
+        TopologyDetails topo1 = genTopology("topo-1", config, 10, 0, 1, 1, currentTime - 2, 20, "jerry");
+        Topologies topologies = new Topologies(topo1);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config);
+        scheduler.schedule(topologies, cluster);
+        assertTrue("Topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
+    }
+
+    /**
+     * Min CPU for worker set to 40%.  1 supervisor with 100% CPU.
+     * 2 topologies with 2 10% components should schedule.  A third topology should then fail scheduling due to lack of CPU.
+     */
+    @Test
+    public void minCpuPreventsThirdTopo() {
+        INimbus iNimbus = new INimbusTest();
+        Map<String, SupervisorDetails> supMap = genSupervisors(1, 4, 100, 60000);
+        Config config = createClusterConfig(10, 500, 500, null);
+        config.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, 40.0);
+        TopologyDetails topo1 = genTopology("topo-1", config, 2, 0, 1, 1, currentTime - 2, 20, "jerry");
+        TopologyDetails topo2 = genTopology("topo-2", config, 2, 0, 1, 1, currentTime - 2, 20, "jerry");
+        TopologyDetails topo3 = genTopology("topo-3", config, 2, 0, 1, 1, currentTime - 2, 20, "jerry");
+        Topologies topologies = new Topologies(topo1, topo2, topo3);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config);
+        scheduler.schedule(topologies, cluster);
+        assertTrue("topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
+        assertTrue("topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
+        assertFalse("topo-3 unscheduled?", cluster.getAssignmentById(topo3.getId()) != null);
+
+        SchedulerAssignment assignment1 = cluster.getAssignmentById(topo1.getId());
+        assertEquals(1, assignment1.getSlots().size());
+        Map<WorkerSlot, WorkerResources> assignedSlots1 = assignment1.getScheduledResources();
+        double assignedCpu = 0.0;
+        for (Entry<WorkerSlot, WorkerResources> entry : assignedSlots1.entrySet()) {
+            WorkerResources wr = entry.getValue();
+            assignedCpu += wr.get_cpu();
+        }
+        assertEquals(40.0, assignedCpu, 0.001);
+
+        SchedulerAssignment assignment2 = cluster.getAssignmentById(topo2.getId());
+        assertEquals(1, assignment2.getSlots().size());
+        Map<WorkerSlot, WorkerResources> assignedSlots2 = assignment2.getScheduledResources();
+        assignedCpu = 0.0;
+        for (Entry<WorkerSlot, WorkerResources> entry : assignedSlots2.entrySet()) {
+            WorkerResources wr = entry.getValue();
+            assignedCpu += wr.get_cpu();
+        }
+        assertEquals(40.0, assignedCpu, 0.001);
+    }
+
+    /**
+     * Min CPU for worker set to 50%.  1 supervisor with 100% CPU.
+     * A topology with 3 workers should fail scheduling even if under CPU.
+     */
+    @Test
+    public void minCpuWorkerSplitFails() {
+        INimbus iNimbus = new INimbusTest();
+        Map<String, SupervisorDetails> supMap = genSupervisors(1, 4, 100, 60000);
+        Config config = createClusterConfig(10, 500, 500, null);
+        config.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, 50.0);
+        TopologyDetails topo1 = genTopology("topo-1", config, 10, 0, 1, 1, currentTime - 2, 20,
+                "jerry", 2000.0);
+        Topologies topologies = new Topologies(topo1);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config);
+        scheduler.schedule(topologies, cluster);
+        assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null);
+    }
+
+    /**
      * Test multiple spouts and cyclic topologies
      */
     @Test