You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2019/09/05 18:08:15 UTC

[storm] branch 2.1.x-branch updated: STORM-3488 Scheduling can cause RAS_Node resources to become negative

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/2.1.x-branch by this push:
     new a6a8b4b  STORM-3488 Scheduling can cause RAS_Node resources to become negative
     new ad9f8fc  Merge pull request #3123 from dandsager1/2.1.x-STORM-3488
a6a8b4b is described below

commit a6a8b4b9c2d8d8aac76fb36fc332b500d064a9e4
Author: dandsager <da...@verizonmedia.com>
AuthorDate: Wed Jul 31 13:47:45 2019 -0500

    STORM-3488 Scheduling can cause RAS_Node resources to become negative
---
 .../storm/daemon/nimbus/TopologyResources.java     |  16 +--
 .../java/org/apache/storm/scheduler/Cluster.java   |  98 ++++++++++--------
 .../storm/scheduler/SchedulerAssignment.java       |   6 +-
 .../storm/scheduler/SchedulerAssignmentImpl.java   |  20 ++--
 .../apache/storm/scheduler/resource/RAS_Node.java  |   7 +-
 .../TestDefaultResourceAwareStrategy.java          | 115 ++++++++++++++++-----
 .../TestGenericResourceAwareStrategy.java          |   2 +-
 7 files changed, 172 insertions(+), 92 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
index bb932ed..a34292c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
@@ -37,7 +37,7 @@ public final class TopologyResources {
     private double assignedNonSharedMemOffHeap;
     private double assignedCpu;
     private TopologyResources(TopologyDetails td, Collection<WorkerResources> workers,
-                              Map<String, Double> sharedOffHeap) {
+                              Map<String, Double> nodeIdToSharedOffHeapNode) {
         requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
         requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
         requestedSharedMemOnHeap = td.getRequestedSharedOnHeap();
@@ -73,17 +73,17 @@ public final class TopologyResources {
             }
         }
 
-        if (sharedOffHeap != null) {
-            double sharedOff = sharedOffHeap.values().stream().reduce(0.0, (sum, val) -> sum + val);
+        if (nodeIdToSharedOffHeapNode != null) {
+            double sharedOff = nodeIdToSharedOffHeapNode.values().stream().reduce(0.0, (sum, val) -> sum + val);
             assignedSharedMemOffHeap += sharedOff;
             assignedMemOffHeap += sharedOff;
         }
     }
     public TopologyResources(TopologyDetails td, SchedulerAssignment assignment) {
-        this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeap(assignment));
+        this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeapNode(assignment));
     }
     public TopologyResources(TopologyDetails td, Assignment assignment) {
-        this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeap(assignment));
+        this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeapNode(assignment));
     }
     public TopologyResources() {
         this(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
@@ -142,15 +142,15 @@ public final class TopologyResources {
         return ret;
     }
 
-    private static Map<String, Double> getNodeIdToSharedOffHeap(SchedulerAssignment assignment) {
+    private static Map<String, Double> getNodeIdToSharedOffHeapNode(SchedulerAssignment assignment) {
         Map<String, Double> ret = null;
         if (assignment != null) {
-            ret = assignment.getNodeIdToTotalSharedOffHeapMemory();
+            ret = assignment.getNodeIdToTotalSharedOffHeapNodeMemory();
         }
         return ret;
     }
 
-    private static Map<String, Double> getNodeIdToSharedOffHeap(Assignment assignment) {
+    private static Map<String, Double> getNodeIdToSharedOffHeapNode(Assignment assignment) {
         Map<String, Double> ret = null;
         if (assignment != null) {
             ret = assignment.get_total_shared_off_heap();
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 0f09aa7..99360ff 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -54,8 +54,6 @@ import org.slf4j.LoggerFactory;
  */
 public class Cluster implements ISchedulingState {
     private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
-    private static final Function<String, Set<WorkerSlot>> MAKE_SET = (x) -> new HashSet<>();
-    private static final Function<String, Map<WorkerSlot, NormalizedResourceRequest>> MAKE_MAP = (x) -> new HashMap<>();
 
     /**
      * key: supervisor id, value: supervisor details.
@@ -80,6 +78,7 @@ public class Cluster implements ISchedulingState {
     private final Map<String, Object> conf;
     private final Topologies topologies;
     private final Map<String, Map<WorkerSlot, NormalizedResourceRequest>> nodeToScheduledResourcesCache;
+    private final Map<String, Map<String, Double>> nodeToScheduledOffHeapNodeMemoryCache;   // node -> topologyId -> double
     private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
     private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache = new HashMap<>();
     private final ResourceMetrics resourceMetrics;
@@ -88,6 +87,14 @@ public class Cluster implements ISchedulingState {
     private INimbus inimbus;
     private double minWorkerCpu = 0.0;
 
+    private static <K, V> Map<K, V> makeMap(String key) {
+        return new HashMap<>();
+    }
+
+    private static <K> Set<K> makeSet(String key) {
+        return new HashSet<>();
+    }
+
     public Cluster(
         INimbus nimbus,
         ResourceMetrics resourceMetrics,
@@ -148,6 +155,7 @@ public class Cluster implements ISchedulingState {
         this.resourceMetrics = resourceMetrics;
         this.supervisors.putAll(supervisors);
         this.nodeToScheduledResourcesCache = new HashMap<>(this.supervisors.size());
+        this.nodeToScheduledOffHeapNodeMemoryCache = new HashMap<>();
         this.nodeToUsedSlotsCache = new HashMap<>(this.supervisors.size());
 
         for (Map.Entry<String, SupervisorDetails> entry : supervisors.entrySet()) {
@@ -359,7 +367,7 @@ public class Cluster implements ISchedulingState {
 
     @Override
     public Set<Integer> getUsedPorts(SupervisorDetails supervisor) {
-        return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), MAKE_SET)
+        return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), Cluster::makeSet)
             .stream()
             .map(WorkerSlot::getPort)
             .collect(Collectors.toSet());
@@ -504,7 +512,7 @@ public class Cluster implements ISchedulingState {
         }
         for (SharedMemory shared : td.getSharedMemoryRequests(executors)) {
             totalResources.addOffHeap(shared.get_off_heap_worker());
-            totalResources.addOnHeap(shared.get_off_heap_worker());
+            totalResources.addOnHeap(shared.get_on_heap());
 
             addResource(
                 sharedTotalResources,
@@ -556,11 +564,7 @@ public class Cluster implements ISchedulingState {
         }
 
         double currentTotal = 0.0;
-        double afterTotal = 0.0;
-        double afterOnHeap = 0.0;
-
         double currentCpuTotal = 0.0;
-        double afterCpuTotal = 0.0;
 
         Set<ExecutorDetails> wouldBeAssigned = new HashSet<>();
         wouldBeAssigned.add(exec);
@@ -574,18 +578,17 @@ public class Cluster implements ISchedulingState {
                 currentTotal = wrCurrent.get_mem_off_heap() + wrCurrent.get_mem_on_heap();
                 currentCpuTotal = wrCurrent.get_cpu();
             }
-            WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
-            afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
-            afterOnHeap = wrAfter.get_mem_on_heap();
 
-            currentTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment);
-            afterTotal += calculateSharedOffHeapMemory(ws.getNodeId(), assignment, exec);
-            afterCpuTotal = wrAfter.get_cpu();
-        } else {
-            WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
-            afterCpuTotal = wrAfter.get_cpu();
+            currentTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment, td);
         }
 
+        WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned);
+        double afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap();
+        afterTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment, td, exec);
+
+        double afterOnHeap = wrAfter.get_mem_on_heap();
+        double afterCpuTotal = wrAfter.get_cpu();
+
         double cpuAdded = afterCpuTotal - currentCpuTotal;
         double cpuAvailable = resourcesAvailable.getTotalCpu();
 
@@ -673,9 +676,9 @@ public class Cluster implements ISchedulingState {
 
         assignment.assign(slot, executors, resources);
         String nodeId = slot.getNodeId();
-        double sharedOffHeapMemory = calculateSharedOffHeapMemory(nodeId, assignment);
-        assignment.setTotalSharedOffHeapMemory(nodeId, sharedOffHeapMemory);
-        updateCachesForWorkerSlot(slot, resources, sharedOffHeapMemory);
+        double sharedOffHeapNodeMemory = calculateSharedOffHeapNodeMemory(nodeId, assignment, td);
+        assignment.setTotalSharedOffHeapNodeMemory(nodeId, sharedOffHeapNodeMemory);
+        updateCachesForWorkerSlot(slot, resources, topologyId, sharedOffHeapNodeMemory);
         totalResourcesPerNodeCache.remove(slot.getNodeId());
     }
 
@@ -700,31 +703,32 @@ public class Cluster implements ISchedulingState {
     }
 
     /**
-     * Calculate the amount of shared off heap memory on a given nodes with the given assignment.
+     * Calculate the amount of shared off heap node memory on a given node with the given assignment.
      *
      * @param nodeId     the id of the node
      * @param assignment the current assignment
-     * @return the amount of shared off heap memory for that node in MB
+     * @param td         the topology details
+     * @return the amount of shared off heap node memory for that node in MB
      */
-    private double calculateSharedOffHeapMemory(String nodeId, SchedulerAssignmentImpl assignment) {
-        return calculateSharedOffHeapMemory(nodeId, assignment, null);
+    private double calculateSharedOffHeapNodeMemory(String nodeId, SchedulerAssignmentImpl assignment, TopologyDetails td) {
+        return calculateSharedOffHeapNodeMemory(nodeId, assignment, td, null);
     }
 
-    private double calculateSharedOffHeapMemory(
-        String nodeId, SchedulerAssignmentImpl assignment, ExecutorDetails extra) {
-        double memorySharedWithinNode = 0.0;
-        TopologyDetails td = topologies.getById(assignment.getTopologyId());
+    private double calculateSharedOffHeapNodeMemory(
+        String nodeId, SchedulerAssignmentImpl assignment, TopologyDetails td, ExecutorDetails extra) {
         Set<ExecutorDetails> executorsOnNode = new HashSet<>();
-        for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
-            assignment.getSlotToExecutors().entrySet()) {
-            if (nodeId.equals(entry.getKey().getNodeId())) {
-                executorsOnNode.addAll(entry.getValue());
+        if (assignment != null) {
+            for (Entry<WorkerSlot, Collection<ExecutorDetails>> entry : assignment.getSlotToExecutors().entrySet()) {
+                if (nodeId.equals(entry.getKey().getNodeId())) {
+                    executorsOnNode.addAll(entry.getValue());
+                }
             }
         }
         if (extra != null) {
             executorsOnNode.add(extra);
         }
         //Now check for overlap on the node
+        double memorySharedWithinNode = 0.0;
         for (SharedMemory shared : td.getSharedMemoryRequests(executorsOnNode)) {
             memorySharedWithinNode += shared.get_off_heap_node();
         }
@@ -743,10 +747,11 @@ public class Cluster implements ISchedulingState {
                 assertValidTopologyForModification(assignment.getTopologyId());
                 assignment.unassignBySlot(slot);
                 String nodeId = slot.getNodeId();
-                assignment.setTotalSharedOffHeapMemory(
-                    nodeId, calculateSharedOffHeapMemory(nodeId, assignment));
-                nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).put(slot, new NormalizedResourceRequest());
-                nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).remove(slot);
+                TopologyDetails td = topologies.getById(assignment.getTopologyId());
+                assignment.setTotalSharedOffHeapNodeMemory(
+                    nodeId, calculateSharedOffHeapNodeMemory(nodeId, assignment, td));
+                nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(slot, new NormalizedResourceRequest());
+                nodeToUsedSlotsCache.computeIfAbsent(nodeId, Cluster::makeSet).remove(slot);
             }
         }
         //Invalidate the cache as something on the node changed
@@ -768,7 +773,7 @@ public class Cluster implements ISchedulingState {
 
     @Override
     public boolean isSlotOccupied(WorkerSlot slot) {
-        return nodeToUsedSlotsCache.computeIfAbsent(slot.getNodeId(), MAKE_SET).contains(slot);
+        return nodeToUsedSlotsCache.computeIfAbsent(slot.getNodeId(), Cluster::makeSet).contains(slot);
     }
 
     @Override
@@ -963,7 +968,7 @@ public class Cluster implements ISchedulingState {
                 sr = sr.add(entry.getValue());
                 ret.put(id, sr);
             }
-            Map<String, Double> nodeIdToSharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapMemory();
+            Map<String, Double> nodeIdToSharedOffHeap = assignment.getNodeIdToTotalSharedOffHeapNodeMemory();
             if (nodeIdToSharedOffHeap != null) {
                 for (Entry<String, Double> entry : nodeIdToSharedOffHeap.entrySet()) {
                     String id = entry.getKey();
@@ -1003,13 +1008,14 @@ public class Cluster implements ISchedulingState {
     /**
      * This method updates ScheduledResources and UsedSlots cache for given workerSlot.
      */
-    private void updateCachesForWorkerSlot(WorkerSlot workerSlot, WorkerResources workerResources, Double sharedoffHeapMemory) {
+    private void updateCachesForWorkerSlot(WorkerSlot workerSlot, WorkerResources workerResources, String topologyId,
+                                           Double sharedOffHeapNodeMemory) {
         String nodeId = workerSlot.getNodeId();
         NormalizedResourceRequest normalizedResourceRequest = new NormalizedResourceRequest();
         normalizedResourceRequest.add(workerResources);
-        normalizedResourceRequest.addOffHeap(sharedoffHeapMemory);
-        nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).put(workerSlot, normalizedResourceRequest);
-        nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).add(workerSlot);
+        nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).put(workerSlot, normalizedResourceRequest);
+        nodeToScheduledOffHeapNodeMemoryCache.computeIfAbsent(nodeId, Cluster::makeMap).put(topologyId, sharedOffHeapNodeMemory);
+        nodeToUsedSlotsCache.computeIfAbsent(nodeId, Cluster::makeSet).add(workerSlot);
     }
 
     public ResourceMetrics getResourceMetrics() {
@@ -1019,10 +1025,16 @@ public class Cluster implements ISchedulingState {
     @Override
     public NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId) {
         return totalResourcesPerNodeCache.computeIfAbsent(nodeId, (nid) -> {
+            // executor resources
             NormalizedResourceRequest totalScheduledResources = new NormalizedResourceRequest();
-            for (NormalizedResourceRequest req : nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).values()) {
+            for (NormalizedResourceRequest req : nodeToScheduledResourcesCache.computeIfAbsent(nodeId, Cluster::makeMap).values()) {
                 totalScheduledResources.add(req);
             }
+            // shared off heap node memory
+            for (Double offHeapNodeMemory : nodeToScheduledOffHeapNodeMemoryCache.computeIfAbsent(nid, Cluster::makeMap).values()) {
+                totalScheduledResources.addOffHeap(offHeapNodeMemory);
+            }
+
             return totalScheduledResources;
         });
     }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
index 347c95f..e2ad2d4 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
@@ -83,9 +83,9 @@ public interface SchedulerAssignment {
     public Map<WorkerSlot, WorkerResources> getScheduledResources();
 
     /**
-     * Get the total shared off heap memory mapping.
+     * Get the total shared off heap node memory mapping.
      *
-     * @return host to total shared off heap memory mapping.
+     * @return host to total shared off heap node memory mapping.
      */
-    public Map<String, Double> getNodeIdToTotalSharedOffHeapMemory();
+    public Map<String, Double> getNodeIdToTotalSharedOffHeapNodeMemory();
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
index 077dafe..ee41630 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
@@ -18,12 +18,10 @@
 
 package org.apache.storm.scheduler;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
@@ -45,7 +43,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
      */
     private final Map<ExecutorDetails, WorkerSlot> executorToSlot = new HashMap<>();
     private final Map<WorkerSlot, WorkerResources> resources = new HashMap<>();
-    private final Map<String, Double> nodeIdToTotalSharedOffHeap = new HashMap<>();
+    private final Map<String, Double> nodeIdToTotalSharedOffHeapNode = new HashMap<>();
     private final Map<WorkerSlot, Collection<ExecutorDetails>> slotToExecutors = new HashMap<>();
 
     /**
@@ -78,7 +76,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
             if (nodeIdToTotalSharedOffHeap.entrySet().stream().anyMatch((entry) -> entry.getKey() == null || entry.getValue() == null)) {
                 throw new RuntimeException("Cannot create off heap with a null in it " + nodeIdToTotalSharedOffHeap);
             }
-            this.nodeIdToTotalSharedOffHeap.putAll(nodeIdToTotalSharedOffHeap);
+            this.nodeIdToTotalSharedOffHeapNode.putAll(nodeIdToTotalSharedOffHeap);
         }
     }
 
@@ -88,7 +86,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
 
     public SchedulerAssignmentImpl(SchedulerAssignment assignment) {
         this(assignment.getTopologyId(), assignment.getExecutorToSlot(),
-             assignment.getScheduledResources(), assignment.getNodeIdToTotalSharedOffHeapMemory());
+             assignment.getScheduledResources(), assignment.getNodeIdToTotalSharedOffHeapNodeMemory());
     }
 
     @Override
@@ -132,7 +130,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
         SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other;
 
         return resources.equals(o.resources)
-               && nodeIdToTotalSharedOffHeap.equals(o.nodeIdToTotalSharedOffHeap);
+               && nodeIdToTotalSharedOffHeapNode.equals(o.nodeIdToTotalSharedOffHeapNode);
     }
 
     @Override
@@ -186,7 +184,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
             }
         }
         if (!isFound) {
-            nodeIdToTotalSharedOffHeap.remove(node);
+            nodeIdToTotalSharedOffHeapNode.remove(node);
         }
     }
 
@@ -225,12 +223,12 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
         return resources;
     }
 
-    public void setTotalSharedOffHeapMemory(String node, double value) {
-        nodeIdToTotalSharedOffHeap.put(node, value);
+    public void setTotalSharedOffHeapNodeMemory(String node, double value) {
+        nodeIdToTotalSharedOffHeapNode.put(node, value);
     }
 
     @Override
-    public Map<String, Double> getNodeIdToTotalSharedOffHeapMemory() {
-        return nodeIdToTotalSharedOffHeap;
+    public Map<String, Double> getNodeIdToTotalSharedOffHeapNodeMemory() {
+        return nodeIdToTotalSharedOffHeapNode;
     }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index e1cd1cf..ddea221 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -19,6 +19,7 @@
 package org.apache.storm.scheduler.resource;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -163,11 +164,11 @@ public class RAS_Node {
      * @return the slots currently assigned to that topology on this node.
      */
     public Collection<WorkerSlot> getUsedSlots(String topId) {
-        Collection<WorkerSlot> ret = null;
         if (topIdToUsedSlots.get(topId) != null) {
-            ret = workerIdsToWorkers(topIdToUsedSlots.get(topId).keySet());
+            return workerIdsToWorkers(topIdToUsedSlots.get(topId).keySet());
+        } else {
+            return Collections.emptySet();
         }
-        return ret;
     }
 
     public boolean isAlive() {
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index d72d362..d1ee091 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -49,13 +49,14 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.*;
 import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -104,6 +105,75 @@ public class TestDefaultResourceAwareStrategy {
         }
     }
 
+    /*
+     * test scheduling does not cause negative resources
+     */
+    @Test
+    public void testSchedulingNegativeResources() {
+        int spoutParallelism = 2;
+        int boltParallelism = 2;
+        double cpuPercent = 10;
+        double memoryOnHeap = 10;
+        double memoryOffHeap = 10;
+        double sharedOnHeapWithinWorker = 400;
+        double sharedOffHeapWithinNode = 700;
+        double sharedOffHeapWithinWorker = 500;
+
+        Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null);
+        TopologyDetails[] topo = new TopologyDetails[2];
+
+        // 1st topology
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("spout", new TestSpout(),
+                spoutParallelism);
+        builder.setBolt("bolt-1", new TestBolt(),
+                boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "bolt-1 shared off heap within worker")).shuffleGrouping("spout");
+        builder.setBolt("bolt-2", new TestBolt(),
+                boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "bolt-2 shared off heap within node")).shuffleGrouping("bolt-1");
+        builder.setBolt("bolt-3", new TestBolt(),
+                boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "bolt-3 shared on heap within worker")).shuffleGrouping("bolt-2");
+        StormTopology stormToplogy = builder.createTopology();
+
+        conf.put(Config.TOPOLOGY_PRIORITY, 1);
+        conf.put(Config.TOPOLOGY_NAME, "testTopology-0");
+        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
+        topo[0] = new TopologyDetails("testTopology-id-0", conf, stormToplogy, 0,
+                genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+        // 2nd topology
+        builder = new TopologyBuilder();
+        builder.setSpout("spout", new TestSpout(),
+                spoutParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "spout shared off heap within node"));
+        stormToplogy = builder.createTopology();
+
+        conf.put(Config.TOPOLOGY_PRIORITY, 0);
+        conf.put(Config.TOPOLOGY_NAME, "testTopology-1");
+        topo[1] = new TopologyDetails("testTopology-id-1", conf, stormToplogy, 0,
+                genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+        Map<String, SupervisorDetails> supMap = genSupervisors(1, 4, 500, 2000);
+        Topologies topologies = new Topologies(topo[0]);
+        Cluster cluster = new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
+
+        // schedule 1st topology
+        scheduler = new ResourceAwareScheduler();
+        scheduler.prepare(conf);
+        scheduler.schedule(topologies, cluster);
+        assertTopologiesFullyScheduled(cluster, topo[0].getName());
+
+        // attempt scheduling both topologies.
+        // this triggered negative resource event as the second topology incorrectly scheduled with the first in place
+        // first topology should get evicted for higher priority (lower value) second topology to successfully schedule
+        topologies = new Topologies(topo[0], topo[1]);
+        cluster = new Cluster(cluster, topologies);
+        scheduler.schedule(topologies, cluster);
+        assertTopologiesNotScheduled(cluster, topo[0].getName());
+        assertTopologiesFullyScheduled(cluster, topo[1].getName());
+
+        // check negative resource count
+        assertThat(cluster.getResourceMetrics().getNegativeResourceEventsMeter().getCount(), is(0L));
+    }
+
     /**
      * test if the scheduling logic for the DefaultResourceAwareStrategy is correct
      */
@@ -115,25 +185,26 @@ public class TestDefaultResourceAwareStrategy {
         double cpuPercent = 10;
         double memoryOnHeap = 10;
         double memoryOffHeap = 10;
-        double sharedOnHeap = 500;
-        double sharedOffHeapNode = 700;
-        double sharedOffHeapWorker = 500;
+        double sharedOnHeapWithinWorker = 400;
+        double sharedOffHeapWithinNode = 700;
+        double sharedOffHeapWithinWorker = 600;
+
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout("spout", new TestSpout(),
                 spoutParallelism);
         builder.setBolt("bolt-1", new TestBolt(),
-                boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWorker, "bolt-1 shared off heap worker")).shuffleGrouping("spout");
+                boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWithinWorker, "bolt-1 shared off heap within worker")).shuffleGrouping("spout");
         builder.setBolt("bolt-2", new TestBolt(),
-                boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapNode, "bolt-2 shared node")).shuffleGrouping("bolt-1");
+                boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapWithinNode, "bolt-2 shared off heap within node")).shuffleGrouping("bolt-1");
         builder.setBolt("bolt-3", new TestBolt(),
-                boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeap, "bolt-3 shared worker")).shuffleGrouping("bolt-2");
+                boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeapWithinWorker, "bolt-3 shared on heap within worker")).shuffleGrouping("bolt-2");
 
         StormTopology stormToplogy = builder.createTopology();
 
         INimbus iNimbus = new INimbusTest();
         Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 500, 2000);
         Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null);
-        
+
         conf.put(Config.TOPOLOGY_PRIORITY, 0);
         conf.put(Config.TOPOLOGY_NAME, "testTopology");
         conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
@@ -144,10 +215,9 @@ public class TestDefaultResourceAwareStrategy {
         Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
 
         scheduler = new ResourceAwareScheduler();
-
         scheduler.prepare(conf);
         scheduler.schedule(topologies, cluster);
-        
+
         for (Entry<String, SupervisorResources> entry: cluster.getSupervisorsResourcesMap().entrySet()) {
             String supervisorId = entry.getKey();
             SupervisorResources resources = entry.getValue();
@@ -158,25 +228,24 @@ public class TestDefaultResourceAwareStrategy {
         // Everything should fit in a single slot
         int totalNumberOfTasks = (spoutParallelism + (boltParallelism * numBolts));
         double totalExpectedCPU = totalNumberOfTasks * cpuPercent;
-        double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeap;
-        double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWorker;
-        
+        double totalExpectedOnHeap = (totalNumberOfTasks * memoryOnHeap) + sharedOnHeapWithinWorker;
+        double totalExpectedWorkerOffHeap = (totalNumberOfTasks * memoryOffHeap) + sharedOffHeapWithinWorker;
+
         SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
-        assertEquals(1, assignment.getSlots().size());
+        assertThat(assignment.getSlots().size(), is(1));
         WorkerSlot ws = assignment.getSlots().iterator().next();
         String nodeId = ws.getNodeId();
-        assertEquals(1, assignment.getNodeIdToTotalSharedOffHeapMemory().size());
-        assertEquals(sharedOffHeapNode, assignment.getNodeIdToTotalSharedOffHeapMemory().get(nodeId), 0.01);
-        assertEquals(1, assignment.getScheduledResources().size());
+        assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().size(), is(1));
+        assertThat(assignment.getNodeIdToTotalSharedOffHeapNodeMemory().get(nodeId), closeTo(sharedOffHeapWithinNode, 0.01));
+        assertThat(assignment.getScheduledResources().size(), is(1));
         WorkerResources resources = assignment.getScheduledResources().get(ws);
-        assertEquals(totalExpectedCPU, resources.get_cpu(), 0.01);
-        assertEquals(totalExpectedOnHeap, resources.get_mem_on_heap(), 0.01);
-        assertEquals(totalExpectedWorkerOffHeap, resources.get_mem_off_heap(), 0.01);
-        assertEquals(sharedOnHeap, resources.get_shared_mem_on_heap(), 0.01);
-        assertEquals(sharedOffHeapWorker, resources.get_shared_mem_off_heap(), 0.01);
+        assertThat(resources.get_cpu(), closeTo(totalExpectedCPU, 0.01));
+        assertThat(resources.get_mem_on_heap(), closeTo(totalExpectedOnHeap, 0.01));
+        assertThat(resources.get_mem_off_heap(), closeTo(totalExpectedWorkerOffHeap, 0.01));
+        assertThat(resources.get_shared_mem_on_heap(), closeTo(sharedOnHeapWithinWorker, 0.01));
+        assertThat(resources.get_shared_mem_off_heap(), closeTo(sharedOffHeapWithinWorker, 0.01));
     }
     
-    
     /**
      * test if the scheduling logic for the DefaultResourceAwareStrategy is correct
      */
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
index 037d226..033b3cf 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
@@ -140,7 +140,7 @@ public class TestGenericResourceAwareStrategy {
         
         SchedulerAssignment assignment = cluster.getAssignmentById(topo.getId());
         Set<WorkerSlot> slots = assignment.getSlots();
-        Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapMemory();
+        Map<String, Double> nodeToTotalShared = assignment.getNodeIdToTotalSharedOffHeapNodeMemory();
         LOG.info("NODE TO SHARED OFF HEAP {}", nodeToTotalShared);
         Map<WorkerSlot, WorkerResources> scheduledResources = assignment.getScheduledResources();
         assertEquals(2, slots.size());