You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2019/04/19 14:32:34 UTC

[storm] branch master updated: STORM-3377 fix scheduling bug with min worker cpu enabled

This is an automated email from the ASF dual-hosted git repository.

kishorvpatil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eec29f  STORM-3377 fix scheduling bug with min worker cpu enabled
     new 844631a  Merge pull request #2999 from agresch/agresch_YSTORM-6160
7eec29f is described below

commit 7eec29fc80b7c2e48137486bbeb65a18e35f648e
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Mon Apr 15 09:21:21 2019 -0500

    STORM-3377 fix scheduling bug with min worker cpu enabled
---
 .../java/org/apache/storm/scheduler/Cluster.java   |  6 +++-
 .../apache/storm/scheduler/resource/RAS_Node.java  |  5 +++-
 .../normalization/NormalizedResourceOffer.java     | 28 ++++++++++++++++++
 .../normalization/NormalizedResources.java         | 13 +++++++++
 .../resource/TestResourceAwareScheduler.java       | 33 ++++++++++++++++++++++
 5 files changed, 83 insertions(+), 2 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 46be6ca..0f09aa7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -551,7 +551,7 @@ public class Cluster implements ISchedulingState {
         double maxHeap) {
 
         NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
-        if (!resourcesAvailable.couldHoldIgnoringSharedMemory(requestedResources)) {
+        if (!resourcesAvailable.couldFit(minWorkerCpu, requestedResources)) {
             return false;
         }
 
@@ -1068,4 +1068,8 @@ public class Cluster implements ISchedulingState {
         setAssignments(other.getAssignments(), false);
         setStatusMap(other.getStatusMap());
     }
+
+    public double getMinWorkerCpu() {
+        return minWorkerCpu;
+    }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 89c0460..e1cd1cf 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -380,9 +380,12 @@ public class RAS_Node {
      *     way it would ever fit.
      */
     public boolean couldEverFit(ExecutorDetails exec, TopologyDetails td) {
+        if (!isAlive) {
+            return false;
+        }
         NormalizedResourceOffer avail = getTotalAvailableResources();
         NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
-        return isAlive && avail.couldHoldIgnoringSharedMemory(requestedResources);
+        return avail.couldFit(cluster.getMinWorkerCpu(), requestedResources);
     }
 
     @Override
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
index 707c893..b8a6431 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -144,6 +144,11 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
             other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
     }
 
+    public boolean couldHoldIgnoringSharedMemoryAndCpu(NormalizedResourcesWithMemory other) {
+        return normalizedResources.couldHoldIgnoringSharedMemoryAndCpu(
+                other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
+    }
+
     public double getTotalCpu() {
         return normalizedResources.getTotalCpu();
     }
@@ -177,4 +182,27 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
     public boolean areAnyOverZero() {
         return totalMemoryMb > 0 || normalizedResources.areAnyOverZero();
     }
+
+    /**
+     * Is there any possibility that a resource request could ever fit on this.
+     * @param minWorkerCpu the configured minimum worker CPU
+     * @param requestedResources the requested resources
+     * @return true if there is the possibility it might fit, no guarantee that it will, or false if there is no
+     *     way it would ever fit.
+     */
+    public boolean couldFit(double minWorkerCpu, NormalizedResourceRequest requestedResources) {
+        if (minWorkerCpu < 0.001) {
+            return this.couldHoldIgnoringSharedMemory(requestedResources);
+        } else {
+            // Assume that there could be a worker already on the node that is under the minWorkerCpu budget.
+            // It's possible we could combine with it.  Let's disregard minWorkerCpu from the request
+            // and validate that CPU as a rough fit.
+            double requestedCpu = Math.max(requestedResources.getTotalCpu() - minWorkerCpu, 0.0);
+            if (requestedCpu > this.getTotalCpu()) {
+                return false;
+            }
+            // now check memory only
+            return this.couldHoldIgnoringSharedMemoryAndCpu(requestedResources);
+        }
+    }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index 6419406..01eba1e 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -210,6 +210,19 @@ public class NormalizedResources {
         if (this.cpu < other.getTotalCpu()) {
             return false;
         }
+        return couldHoldIgnoringSharedMemoryAndCpu(other, thisTotalMemoryMb, otherTotalMemoryMb);
+    }
+
+    /**
+     * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
+     * does not check memory because with shared memory it is beyond the scope of this.  It also does not check CPU.
+     *
+     * @param other              the resources that we want to check if they would fit in this.
+     * @param thisTotalMemoryMb  The total memory in MB of this
+     * @param otherTotalMemoryMb The total memory in MB of other
+     * @return true if it might fit, else false if it could not possibly fit.
+     */
+    public boolean couldHoldIgnoringSharedMemoryAndCpu(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
         int length = Math.max(this.otherResources.length, other.otherResources.length);
         for (int i = 0; i < length; i++) {
             if (getResourceAt(i) < other.getResourceAt(i)) {
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 99da7b0..405e2ae 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -1014,6 +1014,39 @@ public class TestResourceAwareScheduler {
         assertEquals(40.0, assignedCpu, 0.001);
     }
 
+    @Test
+    public void testMinCpuMaxMultipleSupervisors() {
+        INimbus iNimbus = new INimbusTest();
+        Map<String, SupervisorDetails> supMap = genSupervisors(3, 4, 300, 60000);
+        Config config = createClusterConfig(5, 50, 50, null);
+        config.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, 100.0);
+        TopologyDetails topo0 = genTopology("topo-0", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+        TopologyDetails topo1 = genTopology("topo-1", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+        TopologyDetails topo2 = genTopology("topo-2", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+        TopologyDetails topo3 = genTopology("topo-3", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+        TopologyDetails topo4 = genTopology("topo-4", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+        TopologyDetails topo5 = genTopology("topo-5", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+        TopologyDetails topo6 = genTopology("topo-6", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+        TopologyDetails topo7 = genTopology("topo-7", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+        TopologyDetails topo8 = genTopology("topo-8", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+        TopologyDetails topo9 = genTopology("topo-9", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+        Topologies topologies = new Topologies(topo0, topo1, topo2, topo3, topo4, topo5, topo6, topo7, topo8, topo9);
+        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(config);
+        scheduler.schedule(topologies, cluster);
+        assertTrue("topo-0 scheduled?", cluster.getAssignmentById(topo0.getId()) != null);
+        assertTrue("topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
+        assertTrue("topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
+        assertTrue("topo-3 scheduled?", cluster.getAssignmentById(topo3.getId()) != null);
+        assertTrue("topo-4 scheduled?", cluster.getAssignmentById(topo4.getId()) != null);
+        assertTrue("topo-5 scheduled?", cluster.getAssignmentById(topo5.getId()) != null);
+        assertTrue("topo-6 scheduled?", cluster.getAssignmentById(topo6.getId()) != null);
+        assertTrue("topo-7 scheduled?", cluster.getAssignmentById(topo7.getId()) != null);
+        assertTrue("topo-8 scheduled?", cluster.getAssignmentById(topo8.getId()) != null);
+        assertFalse("topo-9 unscheduled?", cluster.getAssignmentById(topo9.getId()) != null);
+    }
+
     /**
      * Min CPU for worker set to 50%.  1 supervisor with 100% CPU.
      * A topology with 3 workers should fail scheduling even if under CPU.