You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2019/04/19 14:32:34 UTC
[storm] branch master updated: STORM-3377 fix scheduling bug with
min worker cpu enabled
This is an automated email from the ASF dual-hosted git repository.
kishorvpatil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 7eec29f STORM-3377 fix scheduling bug with min worker cpu enabled
new 844631a Merge pull request #2999 from agresch/agresch_YSTORM-6160
7eec29f is described below
commit 7eec29fc80b7c2e48137486bbeb65a18e35f648e
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Mon Apr 15 09:21:21 2019 -0500
STORM-3377 fix scheduling bug with min worker cpu enabled
---
.../java/org/apache/storm/scheduler/Cluster.java | 6 +++-
.../apache/storm/scheduler/resource/RAS_Node.java | 5 +++-
.../normalization/NormalizedResourceOffer.java | 28 ++++++++++++++++++
.../normalization/NormalizedResources.java | 13 +++++++++
.../resource/TestResourceAwareScheduler.java | 33 ++++++++++++++++++++++
5 files changed, 83 insertions(+), 2 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 46be6ca..0f09aa7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -551,7 +551,7 @@ public class Cluster implements ISchedulingState {
double maxHeap) {
NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
- if (!resourcesAvailable.couldHoldIgnoringSharedMemory(requestedResources)) {
+ if (!resourcesAvailable.couldFit(minWorkerCpu, requestedResources)) {
return false;
}
@@ -1068,4 +1068,8 @@ public class Cluster implements ISchedulingState {
setAssignments(other.getAssignments(), false);
setStatusMap(other.getStatusMap());
}
+
+ public double getMinWorkerCpu() {
+ return minWorkerCpu;
+ }
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 89c0460..e1cd1cf 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -380,9 +380,12 @@ public class RAS_Node {
* way it would ever fit.
*/
public boolean couldEverFit(ExecutorDetails exec, TopologyDetails td) {
+ if (!isAlive) {
+ return false;
+ }
NormalizedResourceOffer avail = getTotalAvailableResources();
NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
- return isAlive && avail.couldHoldIgnoringSharedMemory(requestedResources);
+ return avail.couldFit(cluster.getMinWorkerCpu(), requestedResources);
}
@Override
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
index 707c893..b8a6431 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -144,6 +144,11 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
}
+ public boolean couldHoldIgnoringSharedMemoryAndCpu(NormalizedResourcesWithMemory other) {
+ return normalizedResources.couldHoldIgnoringSharedMemoryAndCpu(
+ other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
+ }
+
public double getTotalCpu() {
return normalizedResources.getTotalCpu();
}
@@ -177,4 +182,27 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
public boolean areAnyOverZero() {
return totalMemoryMb > 0 || normalizedResources.areAnyOverZero();
}
+
+ /**
+ * Is there any possibility that a resource request could ever fit on this.
+ * @param minWorkerCpu the configured minimum worker CPU
+ * @param requestedResources the requested resources
+ * @return true if there is the possibility it might fit, no guarantee that it will, or false if there is no
+ * way it would ever fit.
+ */
+ public boolean couldFit(double minWorkerCpu, NormalizedResourceRequest requestedResources) {
+ if (minWorkerCpu < 0.001) {
+ return this.couldHoldIgnoringSharedMemory(requestedResources);
+ } else {
+ // Assume that there could be a worker already on the node that is under the minWorkerCpu budget.
+ // It's possible we could combine with it. Let's disregard minWorkerCpu from the request
+ // and validate that CPU as a rough fit.
+ double requestedCpu = Math.max(requestedResources.getTotalCpu() - minWorkerCpu, 0.0);
+ if (requestedCpu > this.getTotalCpu()) {
+ return false;
+ }
+ // now check memory only
+ return this.couldHoldIgnoringSharedMemoryAndCpu(requestedResources);
+ }
+ }
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index 6419406..01eba1e 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -210,6 +210,19 @@ public class NormalizedResources {
if (this.cpu < other.getTotalCpu()) {
return false;
}
+ return couldHoldIgnoringSharedMemoryAndCpu(other, thisTotalMemoryMb, otherTotalMemoryMb);
+ }
+
+ /**
+ * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
+ * does not check memory because with shared memory it is beyond the scope of this. It also does not check CPU.
+ *
+ * @param other the resources that we want to check if they would fit in this.
+ * @param thisTotalMemoryMb The total memory in MB of this
+ * @param otherTotalMemoryMb The total memory in MB of other
+ * @return true if it might fit, else false if it could not possibly fit.
+ */
+ public boolean couldHoldIgnoringSharedMemoryAndCpu(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
int length = Math.max(this.otherResources.length, other.otherResources.length);
for (int i = 0; i < length; i++) {
if (getResourceAt(i) < other.getResourceAt(i)) {
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 99da7b0..405e2ae 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -1014,6 +1014,39 @@ public class TestResourceAwareScheduler {
assertEquals(40.0, assignedCpu, 0.001);
}
+ @Test
+ public void testMinCpuMaxMultipleSupervisors() {
+ INimbus iNimbus = new INimbusTest();
+ Map<String, SupervisorDetails> supMap = genSupervisors(3, 4, 300, 60000);
+ Config config = createClusterConfig(5, 50, 50, null);
+ config.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, 100.0);
+ TopologyDetails topo0 = genTopology("topo-0", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+ TopologyDetails topo1 = genTopology("topo-1", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+ TopologyDetails topo2 = genTopology("topo-2", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+ TopologyDetails topo3 = genTopology("topo-3", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+ TopologyDetails topo4 = genTopology("topo-4", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+ TopologyDetails topo5 = genTopology("topo-5", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+ TopologyDetails topo6 = genTopology("topo-6", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+ TopologyDetails topo7 = genTopology("topo-7", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+ TopologyDetails topo8 = genTopology("topo-8", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+ TopologyDetails topo9 = genTopology("topo-9", config, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
+ Topologies topologies = new Topologies(topo0, topo1, topo2, topo3, topo4, topo5, topo6, topo7, topo8, topo9);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+ scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(config);
+ scheduler.schedule(topologies, cluster);
+ assertTrue("topo-0 scheduled?", cluster.getAssignmentById(topo0.getId()) != null);
+ assertTrue("topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
+ assertTrue("topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
+ assertTrue("topo-3 scheduled?", cluster.getAssignmentById(topo3.getId()) != null);
+ assertTrue("topo-4 scheduled?", cluster.getAssignmentById(topo4.getId()) != null);
+ assertTrue("topo-5 scheduled?", cluster.getAssignmentById(topo5.getId()) != null);
+ assertTrue("topo-6 scheduled?", cluster.getAssignmentById(topo6.getId()) != null);
+ assertTrue("topo-7 scheduled?", cluster.getAssignmentById(topo7.getId()) != null);
+ assertTrue("topo-8 scheduled?", cluster.getAssignmentById(topo8.getId()) != null);
+ assertFalse("topo-9 unscheduled?", cluster.getAssignmentById(topo9.getId()) != null);
+ }
+
/**
* Min CPU for worker set to 50%. 1 supervisor with 100% CPU.
* A topology with 3 workers should fail scheduling even if under CPU.