You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2019/09/05 18:08:15 UTC
[storm] branch 2.1.x-branch updated: STORM-3488 Scheduling can
cause RAS_Node resources to become negative
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/2.1.x-branch by this push:
new a6a8b4b STORM-3488 Scheduling can cause RAS_Node resources to become negative
new ad9f8fc Merge pull request #3123 from dandsager1/2.1.x-STORM-3488
a6a8b4b is described below
commit a6a8b4b9c2d8d8aac76fb36fc332b500d064a9e4
Author: dandsager <da...@verizonmedia.com>
AuthorDate: Wed Jul 31 13:47:45 2019 -0500
STORM-3488 Scheduling can cause RAS_Node resources to become negative
---
.../storm/daemon/nimbus/TopologyResources.java | 16 +--
.../java/org/apache/storm/scheduler/Cluster.java | 98 ++++++++++--------
.../storm/scheduler/SchedulerAssignment.java | 6 +-
.../storm/scheduler/SchedulerAssignmentImpl.java | 20 ++--
.../apache/storm/scheduler/resource/RAS_Node.java | 7 +-
.../TestDefaultResourceAwareStrategy.java | 115 ++++++++++++++++-----
.../TestGenericResourceAwareStrategy.java | 2 +-
7 files changed, 172 insertions(+), 92 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
index bb932ed..a34292c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
@@ -37,7 +37,7 @@ public final class TopologyResources {
private double assignedNonSharedMemOffHeap;
private double assignedCpu;
private TopologyResources(TopologyDetails td, Collection<WorkerResources> workers,
- Map<String, Double> sharedOffHeap) {
+ Map<String, Double> nodeIdToSharedOffHeapNode) {
requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
requestedSharedMemOnHeap = td.getRequestedSharedOnHeap();
@@ -73,17 +73,17 @@ public final class TopologyResources {
}
}
- if (sharedOffHeap != null) {
- double sharedOff = sharedOffHeap.values().stream().reduce(0.0, (sum, val) -> sum + val);
+ if (nodeIdToSharedOffHeapNode != null) {
+ double sharedOff = nodeIdToSharedOffHeapNode.values().stream().reduce(0.0, (sum, val) -> sum + val);
assignedSharedMemOffHeap += sharedOff;
assignedMemOffHeap += sharedOff;
}
}
public TopologyResources(TopologyDetails td, SchedulerAssignment assignment) {
- this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeap(assignment));
+ this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeapNode(assignment));
}
public TopologyResources(TopologyDetails td, Assignment assignment) {
- this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeap(assignment));
+ this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeapNode(assignment));
}
public TopologyResources() {
this(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
@@ -142,15 +142,15 @@ public final class TopologyResources {
return ret;
}
- private static Map<String, Double> getNodeIdToSharedOffHeap(SchedulerAssignment assignment) {
+ private static Map<String, Double> getNodeIdToSharedOffHeapNode(SchedulerAssignment assignment) {
Map<String, Double> ret = null;
if (assignment != null) {
- ret = assignment.getNodeIdToTotalSharedOffHeapMemory();
+ ret = assignment.getNodeIdToTotalSharedOffHeapNodeMemory();
}
return ret;
}
- private static Map<String, Double> getNodeIdToSharedOffHeap(Assignment assignment) {
+ private static Map<String, Double> getNodeIdToSharedOffHeapNode(Assignment assignment) {
Map<String, Double> ret = null;
if (assignment != null) {
ret = assignment.get_total_shared_off_heap();
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 0f09aa7..99360ff 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -54,8 +54,6 @@ import org.slf4j.LoggerFactory;
*/
public class Cluster implements ISchedulingState {
private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
- private static final Function<String, Set<WorkerSlot>> MAKE_SET = (x) -> new HashSet<>();
- private static final Function<String, Map<WorkerSlot, NormalizedResourceRequest>> MAKE_MAP = (x) -> new HashMap<>();
/**
* key: supervisor id, value: supervisor details.
@@ -80,6 +78,7 @@ public class Cluster implements ISchedulingState {
private final Map<String, Object> conf;
private final Topologies topologies;
private final Map<String, Map<WorkerSlot, NormalizedResourceRequest>> nodeToScheduledResourcesCache;
+ private final Map<String, Map<String, Double>> nodeToScheduledOffHeapNodeMemoryCache; // node -> topologyId -> double
private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache = new HashMap<>();
private final ResourceMetrics resourceMetrics;
@@ -88,6 +87,14 @@ public class Cluster implements ISchedulingState {
private INimbus inimbus;
private double minWorkerCpu = 0.0;
+ private static <K, V> Map<K, V> makeMap(String key) {
+ return new HashMap<>();
+ }
+
+ private static <K> Set<K> makeSet(String key) {
+ return new HashSet<>();
+ }
+
public Cluster(
INimbus nimbus,
ResourceMetrics resourceMetrics,
@@ -148,6 +155,7 @@ public class Cluster implements ISchedulingState {
this.resourceMetrics = resourceMetrics;
this.supervisors.putAll(supervisors);
this.nodeToScheduledResourcesCache = new HashMap<>(this.supervisors.size());
+ this.nodeToScheduledOffHeapNodeMemoryCache = new HashMap<>();
this.nodeToUsedSlotsCache = new HashMap<>(this.supervisors.size());
for (Map.Entry<String, SupervisorDetails> entry : supervisors.entrySet()) {
@@ -359,7 +367,7 @@ public class Cluster implements ISchedulingState {
@Override
public Set<Integer> getUsedPorts(SupervisorDetails supervisor) {
- return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), MAKE_SET)
+ return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), Cluster::makeSet)
.stream()
.map(WorkerSlot::getPort)
.collect(Collectors.toSet());
@@ -504,7 +512,7 @@ public class Cluster implements ISchedulingState {
}
for (SharedMemory shared : td.getSharedMemoryRequests(executors)) {
totalResources.addOffHeap(shared.get_off_heap_worker());
- totalResources.addOnHeap(shared.get_off_heap_worker());
+ totalResources.addOnHeap(shared.get_on_heap());
addResource(
sharedTotalResources,
@@ -556,11 +564,7 @@ public class Cluster implements ISchedulingState {
}
double currentTotal = 0.0;
- double afterTotal = 0.0;
- double afterOnHeap = 0.0;
-
double currentCpuTotal = 0.0;
- double afterCpuTotal = 0.0;
Set<ExecutorDetails> wouldBeAssigned = new HashSet<>();
wouldBeAssigned.add(exec);
@@ -574,18 +578,17 @@ public class Cluster implements ISchedulingState {
currentTotal = wrCurrent.get_mem_off_heap() + wrCurrent.get_mem_on_heap();
currentCpuTotal = wrCurrent.get_cpu();
}
- WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
- afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
- afterOnHeap = wrAfter.get_mem_on_heap();
- currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment);
- afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment, exec);
- afterCpuTotal = wrAfter.get_cpu();
- } else {
- WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
- afterCpuTotal = wrAfter.get_cpu();
+ currentTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment, td);
}
+ WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
+ double afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
+ afterTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment, td, exec);
+
+ double afterOnHeap = wrAfter.get_mem_on_heap();
+ double afterCpuTotal = wrAfter.get_cpu();
+
double cpuAdded = afterCpuTotal - currentCpuTotal;
double cpuAvailable = resourcesAvailable.getTotalCpu();
@@ -673,9 +676,9 @@ public class Cluster implements ISchedulingState {
assignment.assign(slot, executors, resources);
String nodeId = slot.getNodeId();
- double sharedOffHeapMemory = calculateSharedOffHeapMemory(nodeId, assignment);
- assignment.setTotalSharedOffHeapMemory(nodeId, sharedOffHeapMemory);
- updateCachesForWorkerSlot(slot, resources, sharedOffHeapMemory);
+ double sharedOffHeapNodeMemory = calculateSharedOffHeapNodeMemory(nodeId, assignment, td);
+ assignment.setTotalSharedOffHeapNodeMemory(nodeId, sharedOffHeapNodeMemory);
+ updateCachesForWorkerSlot(slot, resources, topologyId, sharedOffHeapNodeMemory);
totalResourcesPerNodeCache.remove(slot.getNodeId());
}
@@ -700,31 +703,32 @@ public class Cluster implements ISchedulingState {
}
/**
- * Calculate the amount of shared off heap memory on a given nodes with the given assignment.
+ * Calculate the amount of shared off heap node memory on a given node with the given assignment.
*
* @param nodeId the id of the node
* @param assignment the current assignment
- * @return the amount of shared off heap memory for that node in MB
+ * @param td the topology details
+ * @return the amount of shared off heap node memory for that node in MB
*/
- private double calculateSharedOffHeapMemory(String nodeId, SchedulerAssignmentImpl assignment) {
- return calculateSharedOffHeapMemory(nodeId, assignment, null);
+ private double calculateSharedOffHeapNodeMemory(String nodeId, SchedulerAssignmentImpl assignment, TopologyDetails td) {
+ return calculateSharedOffHeapNodeMemory(nodeId, assignment, td, null);
}
- private double calculateSharedOffHeapMemory(
- String nodeId, SchedulerAssignmentImpl assignment, ExecutorDetails extra) {
- double memorySharedWithinNode = 0.0;
- TopologyDetails td = topologies.getById(assignment.getTopologyId());
+ private double calculateSharedOffHeapNodeMemory(
+ String nodeId, SchedulerAssignmentImpl assignment, TopologyDetails td, ExecutorDetails extra) {
Set<ExecutorDetails> executorsOnNode = new HashSet<>();
- for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
- assignment.getSlotToExecutors().entrySet()) {
- if (nodeId.equals(entry.getKey().getNodeId())) {
- executorsOnNode.addAll(entry.getValue());
+ if (assignment != null) {
+ for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry : assignment.getSlotToExecutors().entrySet()) {
+ if (nodeId.equals(entry.getKey().getNodeId())) {
+ executorsOnNode.addAll(entry.getValue());
+ }
}
}
if (extra != null) {
executorsOnNode.add(extra);
}
//Now check for overlap on the node
+ double memorySharedWithinNode = 0.0;
for (SharedMemory shared : td.getSharedMemoryRequests(executorsOnNode)) {
memorySharedWithinNode += shared.get_off_heap_node();
}
@@ -743,10 +747,11 @@ public class Cluster implements ISchedulingState {
assertValidTopologyForModification(assignment.getTopologyId());
assignment.unassignBySlot(slot);
String nodeId = slot.getNodeId();
- assignment.setTotalSharedOffHeapMemory(
- nodeId, calculateSharedOffHeapMemory(nodeId, assignment));
- nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).put(slot, new NormalizedResourceRequest());
- nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).remove(slot);
+ TopologyDetails td = topologies.getById(assignment.getTopologyId());
+ assignment.setTotalSharedOffHeapNodeMemory(
+ nodeId, calculateSharedOffHeapNodeMemory(nodeId, assignment, td));
+ nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(slot, new NormalizedResourceRequest());
+ nodeToUsedSlotsCache.computeIfAbsent(nodeId, Cluster::makeSet).remove(slot);
}
}
//Invalidate the cache as something on the node changed
@@ -768,7 +773,7 @@ public class Cluster implements ISchedulingState {
@Override
public boolean isSlotOccupied(WorkerSlot slot) {
- return nodeToUsedSlotsCache.computeIfAbsent(slot.getNodeId(), MAKE_SET).contains(slot);
+ return nodeToUsedSlotsCache.computeIfAbsent(slot.getNodeId(), Cluster::makeSet).contains(slot);
}
@Override
@@ -963,7 +968,7 @@ public class Cluster implements ISchedulingState {
sr = sr.add(entry.getValue());
ret.put(id, sr);
}
- Map<String, Double> nodeIdToSharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapMemory();
+ Map<String, Double> nodeIdToSharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapNodeMemory();
if (nodeIdToSharedOffHeap != null) {
for (Entry<String, Double> entry : nodeIdToSharedOffHeap.entrySet()) {
String id = entry.getKey();
@@ -1003,13 +1008,14 @@ public class Cluster implements ISchedulingState {
/**
* This method updates ScheduledResources and UsedSlots cache for given workerSlot.
*/
- private void updateCachesForWorkerSlot(WorkerSlot workerSlot, WorkerResources workerResources, Double sharedoffHeapMemory) {
+ private void updateCachesForWorkerSlot(WorkerSlot workerSlot, WorkerResources workerResources, String topologyId,
+ Double sharedOffHeapNodeMemory) {
String nodeId = workerSlot.getNodeId();
NormalizedResourceRequest normalizedResourceRequest = new NormalizedResourceRequest();
normalizedResourceRequest.add(workerResources);
- normalizedResourceRequest.addOffHeap(sharedoffHeapMemory);
- nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).put(workerSlot, normalizedResourceRequest);
- nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).add(workerSlot);
+ nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(workerSlot, normalizedResourceRequest);
+ nodeToScheduledOffHeapNodeMemoryCache.computeIfAbsent(nodeId, Cluster::makeMap).put(topologyId, sharedOffHeapNodeMemory);
+ nodeToUsedSlotsCache.computeIfAbsent(nodeId, Cluster::makeSet).add(workerSlot);
}
public ResourceMetrics getResourceMetrics() {
@@ -1019,10 +1025,16 @@ public class Cluster implements ISchedulingState {
@Override
public NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId) {
return totalResourcesPerNodeCache.computeIfAbsent(nodeId, (nid) -> {
+ // executor resources
NormalizedResourceRequest totalScheduledResources = new NormalizedResourceRequest();
- for (NormalizedResourceRequest req : nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).values()) {
+ for (NormalizedResourceRequest req : nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).values()) {
totalScheduledResources.add(req);
}
+ // shared off heap node memory
+ for (Double offHeapNodeMemory : nodeToScheduledOffHeapNodeMemoryCache.computeIfAbsent(nid, Cluster::makeMap).values()) {
+ totalScheduledResources.addOffHeap(offHeapNodeMemory);
+ }
+
return totalScheduledResources;
});
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
index 347c95f..e2ad2d4 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
@@ -83,9 +83,9 @@ public interface SchedulerAssignment {
public Map<WorkerSlot, WorkerResources> getScheduledResources();
/**
- * Get the total shared off heap memory mapping.
+ * Get the total shared off heap node memory mapping.
*
- * @return host to total shared off heap memory mapping.
+ * @return host to total shared off heap node memory mapping.
*/
- public Map<String, Double> getNodeIdToTotalSharedOffHeapMemory();
+ public Map<String, Double> getNodeIdToTotalSharedOffHeapNodeMemory();
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
index 077dafe..ee41630 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
@@ -18,12 +18,10 @@
package org.apache.storm.scheduler;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
@@ -45,7 +43,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
*/
private final Map<ExecutorDetails, WorkerSlot> executorToSlot = new HashMap<>();
private final Map<WorkerSlot, WorkerResources> resources = new HashMap<>();
- private final Map<String, Double> nodeIdToTotalSharedOffHeap = new HashMap<>();
+ private final Map<String, Double> nodeIdToTotalSharedOffHeapNode = new HashMap<>();
private final Map<WorkerSlot, Collection<ExecutorDetails>> slotToExecutors = new HashMap<>();
/**
@@ -78,7 +76,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
if (nodeIdToTotalSharedOffHeap.entrySet().stream().anyMatch((entry) -> entry.getKey() == null || entry.getValue() == null)) {
throw new RuntimeException("Cannot create off heap with a null in it " + nodeIdToTotalSharedOffHeap);
}
- this.nodeIdToTotalSharedOffHeap.putAll(nodeIdToTotalSharedOffHeap);
+ this.nodeIdToTotalSharedOffHeapNode.putAll(nodeIdToTotalSharedOffHeap);
}
}
@@ -88,7 +86,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
public SchedulerAssignmentImpl(SchedulerAssignment assignment) {
this(assignment.getTopologyId(), assignment.getExecutorToSlot(),
- assignment.getScheduledResources(), assignment.getNodeIdToTotalSharedOffHeapMemory());
+ assignment.getScheduledResources(), assignment.getNodeIdToTotalSharedOffHeapNodeMemory());
}
@Override
@@ -132,7 +130,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other;
return resources.equals(o.resources)
- && nodeIdToTotalSharedOffHeap.equals(o.nodeIdToTotalSharedOffHeap);
+ && nodeIdToTotalSharedOffHeapNode.equals(o.nodeIdToTotalSharedOffHeapNode);
}
@Override
@@ -186,7 +184,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
}
}
if (!isFound) {
- nodeIdToTotalSharedOffHeap.remove(node);
+ nodeIdToTotalSharedOffHeapNode.remove(node);
}
}
@@ -225,12 +223,12 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
return resources;
}
- public void setTotalSharedOffHeapMemory(String node, double value) {
- nodeIdToTotalSharedOffHeap.put(node, value);
+ public void setTotalSharedOffHeapNodeMemory(String node, double value) {
+ nodeIdToTotalSharedOffHeapNode.put(node, value);
}
@Override
- public Map<String, Double> getNodeIdToTotalSharedOffHeapMemory() {
- return nodeIdToTotalSharedOffHeap;
+ public Map<String, Double> getNodeIdToTotalSharedOffHeapNodeMemory() {
+ return nodeIdToTotalSharedOffHeapNode;
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index e1cd1cf..ddea221 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -19,6 +19,7 @@
package org.apache.storm.scheduler.resource;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -163,11 +164,11 @@ public class RAS_Node {
* @return the slots currently assigned to that topology on this node.
*/
public Collection<WorkerSlot> getUsedSlots(String topId) {
- Collection<WorkerSlot> ret = null;
if (topIdToUsedSlots.get(topId) != null) {
- ret = workerIdsToWorkers(topIdToUsedSlots.get(topId).keySet());
+ return workerIdsToWorkers(topIdToUsedSlots.get(topId).keySet());
+ } else {
+ return Collections.emptySet();
}
- return ret;
}
public boolean isAlive() {
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index d72d362..d1ee091 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -49,13 +49,14 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.*;
import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -104,6 +105,75 @@ public class TestDefaultResourceAwareStrategy {
}
}
+ /*
+ * test scheduling does not cause negative resources
+ */
+ @Test
+ public void testSchedulingNegativeResources() {
+ int spoutParallelism = 2;
+ int boltParallelism = 2;
+ double cpuPercent = 10;
+ double memoryOnHeap = 10;
+ double memoryOffHeap = 10;
+ double sharedOnHeapWithinWorker = 400;
+ double sharedOffHeapWithinNode = 700;
+ double sharedOffHeapWithinWorker = 500;
+
+ Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null);
+ TopologyDetails[] topo = new TopologyDetails[2];
+
+ // 1st topology
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("spout", new TestSpout(),
+ spoutParallelism);
+ builder.setBolt("bolt-1", new TestBolt(),
+ boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "bolt-1 shared off heap within worker")).shuffleGrouping("spout");
+ builder.setBolt("bolt-2", new TestBolt(),
+ boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "bolt-2 shared off heap within node")).shuffleGrouping("bolt-1");
+ builder.setBolt("bolt-3", new TestBolt(),
+ boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "bolt-3 shared on heap within worker")).shuffleGrouping("bolt-2");
+ StormTopology stormToplogy = builder.createTopology();
+
+ conf.put(Config.TOPOLOGY_PRIORITY, 1);
+ conf.put(Config.TOPOLOGY_NAME, "testTopology-0");
+ conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
+ topo[0] = new TopologyDetails("testTopology-id-0", conf, stormToplogy, 0,
+ genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+ // 2nd topology
+ builder = new TopologyBuilder();
+ builder.setSpout("spout", new TestSpout(),
+ spoutParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "spout shared off heap within node"));
+ stormToplogy = builder.createTopology();
+
+ conf.put(Config.TOPOLOGY_PRIORITY, 0);
+ conf.put(Config.TOPOLOGY_NAME, "testTopology-1");
+ topo[1] = new TopologyDetails("testTopology-id-1", conf, stormToplogy, 0,
+ genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+ Map<String, SupervisorDetails> supMap = genSupervisors(1, 4, 500, 2000);
+ Topologies topologies = new Topologies(topo[0]);
+ Cluster cluster = new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
+
+ // schedule 1st topology
+ scheduler = new ResourceAwareScheduler();
+ scheduler.prepare(conf);
+ scheduler.schedule(topologies, cluster);
+ assertTopologiesFullyScheduled(cluster, topo[0].getName());
+
+ // attempt scheduling both topologies.
+ // this triggered negative resource event as the second topology incorrectly scheduled with the first in place
+ // first topology should get evicted for higher priority (lower value) second topology to successfully schedule
+ topologies = new Topologies(topo[0], topo[1]);
+ cluster = new Cluster(cluster, topologies);
+ scheduler.schedule(topologies, cluster);
+ assertTopologiesNotScheduled(cluster, topo[0].getName());
+ assertTopologiesFullyScheduled(cluster, topo[1].getName());
+
+ // check negative resource count
+ assertThat(cluster.getResourceMetrics().getNegativeResourceEventsMeter().getCount(), is(0L));
+ }
+
/**
* test if the scheduling logic for the DefaultResourceAwareStrategy is correct
*/
@@ -115,25 +185,26 @@ public class TestDefaultResourceAwareStrategy {
double cpuPercent = 10;
double memoryOnHeap = 10;
double memoryOffHeap = 10;
- double sharedOnHeap = 500;
- double sharedOffHeapNode = 700;
- double sharedOffHeapWorker = 500;
+ double sharedOnHeapWithinWorker = 400;
+ double sharedOffHeapWithinNode = 700;
+ double sharedOffHeapWithinWorker = 600;
+
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestSpout(),
spoutParallelism);
builder.setBolt("bolt-1", new TestBolt(),
- boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWorker, "bolt-1 shared off heap worker")).shuffleGrouping("spout");
+ boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "bolt-1 shared off heap within worker")).shuffleGrouping("spout");
builder.setBolt("bolt-2", new TestBolt(),
- boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapNode, "bolt-2 shared node")).shuffleGrouping("bolt-1");
+ boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "bolt-2 shared off heap within node")).shuffleGrouping("bolt-1");
builder.setBolt("bolt-3", new TestBolt(),
- boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeap, "bolt-3 shared worker")).shuffleGrouping("bolt-2");
+ boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "bolt-3 shared on heap within worker")).shuffleGrouping("bolt-2");
StormTopology stormToplogy = builder.createTopology();
INimbus iNimbus = new INimbusTest();
Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 500, 2000);
Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null);
-
+
conf.put(Config.TOPOLOGY_PRIORITY, 0);
conf.put(Config.TOPOLOGY_NAME, "testTopology");
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
@@ -144,10 +215,9 @@ public class TestDefaultResourceAwareStrategy {
Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
scheduler = new ResourceAwareScheduler();
-
scheduler.prepare(conf);
scheduler.schedule(topologies, cluster);
-
+
for (Entry<String, SupervisorResources> entry: cluster.getSupervisorsResourcesMap().entrySet()) {
String supervisorId = entry.getKey();
SupervisorResources resources = entry.getValue();
@@ -158,25 +228,24 @@ public class TestDefaultResourceAwareStrategy {
// Everything should fit in a single slot
int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
- double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeap;
- double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
-
+ double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeapWithinWorker;
+ double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWithinWorker;
+
SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
- assertEquals(1, assignment.getSlots().size());
+ assertThat(assignment.getSlots().size(), is(1));
WorkerSlot ws = assignment.getSlots().iterator().next();
String nodeId = ws.getNodeId();
- assertEquals(1, assignment.getNodeIdToTotalSharedOffHeapMemory().size());
- assertEquals(sharedOffHeapNode, assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId), 0.01);
- assertEquals(1, assignment.getScheduledResources().size());
+ assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().size(), is(1));
+ assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().get(nodeId), closeTo(sharedOffHeapWithinNode, 0.01));
+ assertThat(assignment.getScheduledResources().size(), is(1));
WorkerResources resources = assignment.getScheduledResources().get(ws);
- assertEquals(totalExpectedCPU, resources.get_cpu(), 0.01);
- assertEquals(totalExpectedOnHeap, resources.get_mem_on_heap(), 0.01);
- assertEquals(totalExpectedWorkerOffHeap, resources.get_mem_off_heap(), 0.01);
- assertEquals(sharedOnHeap, resources.get_shared_mem_on_heap(), 0.01);
- assertEquals(sharedOffHeapWorker, resources.get_shared_mem_off_heap(), 0.01);
+ assertThat(resources.get_cpu(), closeTo(totalExpectedCPU, 0.01));
+ assertThat(resources.get_mem_on_heap(), closeTo(totalExpectedOnHeap, 0.01));
+ assertThat(resources.get_mem_off_heap(), closeTo(totalExpectedWorkerOffHeap, 0.01));
+ assertThat(resources.get_shared_mem_on_heap(), closeTo(sharedOnHeapWithinWorker, 0.01));
+ assertThat(resources.get_shared_mem_off_heap(), closeTo(sharedOffHeapWithinWorker, 0.01));
}
-
/**
* test if the scheduling logic for the DefaultResourceAwareStrategy is correct
*/
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
index 037d226..033b3cf 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
@@ -140,7 +140,7 @@ public class TestGenericResourceAwareStrategy {
SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
Set<WorkerSlot> slots = assignment.getSlots();
- Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapMemory();
+ Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapNodeMemory();
LOG.info("NODE TO SHARED OFF HEAP {}", nodeToTotalShared);
Map<WorkerSlot, WorkerResources> scheduledResources = assignment.getScheduledResources();
assertEquals(2, slots.size());