You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/01/08 21:20:56 UTC

[3/6] storm git commit: Split up NormalizedResources into a few classes with more narrow responsibilities. Make NormalizedResources a regular class instead of an abstract class. Add some tests. Make calculateAvg/Min throw exceptions if the resource total

Split up NormalizedResources into a few classes with more narrow responsibilities. Make NormalizedResources a regular class instead of an abstract class. Add some tests. Make calculateAvg/Min throw exceptions if the resource total is not a superset of the used resources.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/41db23fe
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/41db23fe
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/41db23fe

Branch: refs/heads/master
Commit: 41db23fe86299f71b6ecc7deaa70ab731899c83f
Parents: 8aae491
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Jan 5 21:56:54 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Jan 7 18:27:04 2018 +0100

----------------------------------------------------------------------
 storm-server/pom.xml                            |   2 +-
 .../apache/storm/blobstore/BlobStoreUtils.java  |   2 -
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |   5 +-
 .../supervisor/timer/SupervisorHeartbeat.java   |  16 +-
 .../org/apache/storm/scheduler/Cluster.java     |   5 +-
 .../resource/NormalizedResourceOffer.java       |  89 ++--
 .../resource/NormalizedResourceRequest.java     |  65 ++-
 .../scheduler/resource/NormalizedResources.java | 294 ++++++--------
 .../resource/NormalizedResourcesWithMemory.java |  28 ++
 .../storm/scheduler/resource/RAS_Node.java      |   3 -
 .../storm/scheduler/resource/RAS_Nodes.java     |   1 -
 .../resource/ResourceAwareScheduler.java        |   1 -
 .../resource/ResourceMapArrayBridge.java        |  89 ++++
 .../resource/ResourceNameNormalizer.java        |  65 +++
 .../storm/scheduler/resource/ResourceUtils.java |  14 +-
 .../scheduling/BaseResourceAwareStrategy.java   |   6 +-
 .../DefaultResourceAwareStrategy.java           |   2 -
 .../GenericResourceAwareStrategy.java           |   3 -
 .../org/apache/storm/utils/ServerUtils.java     |  16 +-
 .../resource/TestResourceAwareScheduler.java    |   7 +-
 .../TestUtilsForResourceAwareScheduler.java     |   4 +-
 .../normalization/NormalizedResourcesRule.java  |  50 +++
 .../normalization/NormalizedResourcesTest.java  | 404 +++++++++++++++++++
 .../ResourceMapArrayBridgeTest.java             |  65 +++
 .../scheduling/NormalizedResourcesRule.java     |  50 ---
 .../TestDefaultResourceAwareStrategy.java       |   1 +
 26 files changed, 941 insertions(+), 346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index d6554de..ff917ca 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -130,7 +130,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>2630</maxAllowedViolations>
+                    <maxAllowedViolations>2620</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
index d660ab0..fd1cf40 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -24,9 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import javax.security.auth.Subject;
-
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.Config;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index b495cfa..2c785a5 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -52,7 +52,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.UnaryOperator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import javax.security.auth.Subject;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
@@ -136,7 +135,6 @@ import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
 import org.apache.storm.nimbus.ITopologyValidator;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.SupervisorResources;
 import org.apache.storm.scheduler.DefaultScheduler;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.INimbus;
@@ -144,14 +142,15 @@ import org.apache.storm.scheduler.IScheduler;
 import org.apache.storm.scheduler.SchedulerAssignment;
 import org.apache.storm.scheduler.SchedulerAssignmentImpl;
 import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.SupervisorResources;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
 import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
 import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
 import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
 import org.apache.storm.security.INimbusCredentialPlugin;
 import org.apache.storm.security.auth.AuthUtils;
 import org.apache.storm.security.auth.IAuthorizer;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 87e310e..3faf1e4 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -17,22 +17,20 @@
  */
 package org.apache.storm.daemon.supervisor.timer;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.scheduler.resource.NormalizedResources;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
 public class SupervisorHeartbeat implements Runnable {
 
      private final IStormClusterState stormClusterState;
@@ -92,7 +90,7 @@ public class SupervisorHeartbeat implements Runnable {
             ret.put(stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
         }
 
-        return normalizedResourceMap(ret);
+        return NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(ret);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 9ab0c93..1ed88c7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -37,6 +37,7 @@ import org.apache.storm.networktopography.DNSToSwitchMapping;
 import org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping;
 import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
 import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.NormalizedResources;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;
@@ -44,8 +45,6 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
 public class Cluster implements ISchedulingState {
     private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
     private SchedulerAssignmentImpl assignment;
@@ -440,7 +439,7 @@ public class Cluster implements ISchedulingState {
                     Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, shared.get_on_heap()
             );
         }
-        sharedTotalResources = normalizedResourceMap(sharedTotalResources);
+        sharedTotalResources = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(sharedTotalResources);
         WorkerResources ret = new WorkerResources();
         ret.set_resources(totalResources.toNormalizedMap());
         ret.set_shared_resources(sharedTotalResources);

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
index 58f12d0..787bbce 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
@@ -18,68 +18,97 @@
 
 package org.apache.storm.scheduler.resource;
 
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import org.apache.storm.Constants;
-import org.apache.storm.generated.WorkerResources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An offer of resources that has been normalized.
+ * An offer of resources with normalized resource names.
  */
-public class NormalizedResourceOffer extends NormalizedResources {
+public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
+
     private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceOffer.class);
-    private double totalMemory;
+    private final NormalizedResources normalizedResources;
+    private double totalMemoryMb;
 
     /**
-     * Create a new normalized set of resources.  Note that memory is not covered here because it is not consistent in requests vs offers
-     * because of how on heap vs off heap is used.
+     * Create a new normalized resource offer.
      *
      * @param resources the resources to be normalized.
      */
     public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
-        super(resources, null);
-        totalMemory = getNormalizedResources().getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
+        totalMemoryMb = normalizedResourceMap.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+        this.normalizedResources = new NormalizedResources(normalizedResourceMap);
     }
 
     public NormalizedResourceOffer() {
-        this((Map<String, ? extends Number>)null);
+        this((Map<String, ? extends Number>) null);
     }
 
     public NormalizedResourceOffer(NormalizedResourceOffer other) {
-        super(other);
-        this.totalMemory = other.totalMemory;
+        this.totalMemoryMb = other.totalMemoryMb;
+        this.normalizedResources = new NormalizedResources(other.normalizedResources);
     }
 
     @Override
     public double getTotalMemoryMb() {
-        return totalMemory;
+        return totalMemoryMb;
     }
 
-    @Override
-    public String toString() {
-        return super.toString() + " MEM: " + totalMemory;
+    public Map<String, Double> toNormalizedMap() {
+        Map<String, Double> ret = normalizedResources.toNormalizedMap();
+        ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
+        return ret;
     }
 
-    @Override
-    public Map<String,Double> toNormalizedMap() {
-        Map<String, Double> ret = super.toNormalizedMap();
-        ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemory);
-        return ret;
+    public void add(NormalizedResourcesWithMemory other) {
+        normalizedResources.add(other.getNormalizedResources());
+        totalMemoryMb += other.getTotalMemoryMb();
     }
 
-    @Override
-    public void add(NormalizedResources other) {
-        super.add(other);
-        totalMemory += other.getTotalMemoryMb();
+    public void remove(NormalizedResourcesWithMemory other) {
+        normalizedResources.remove(other.getNormalizedResources());
+        totalMemoryMb -= other.getTotalMemoryMb();
+        if (totalMemoryMb < 0.0) {
+            normalizedResources.throwBecauseResourceBecameNegative(
+                Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
+        };
+    }
+
+    /**
+     * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
+     * double, double).
+     */
+    public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
+        return normalizedResources.calculateAveragePercentageUsedBy(
+            used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+    }
+
+    /**
+     * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+     * double)
+     */
+    public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) {
+        return normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+    }
+
+    /**
+     * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+     * double).
+     */
+    public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
+        return normalizedResources.couldHoldIgnoringSharedMemory(
+            other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
+    }
+
+    public double getTotalCpu() {
+        return normalizedResources.getTotalCpu();
     }
 
     @Override
-    public void remove(NormalizedResources other) {
-        super.remove(other);
-        totalMemory -= other.getTotalMemoryMb();
-        assert totalMemory >= 0.0;
+    public NormalizedResources getNormalizedResources() {
+        return normalizedResources;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
index 5963750..2af90ab 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
@@ -18,12 +18,8 @@
 
 package org.apache.storm.scheduler.resource;
 
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.generated.ComponentCommon;
@@ -36,14 +32,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A request that has been normalized.
+ * A resource request with normalized resource names.
  */
-public class NormalizedResourceRequest extends NormalizedResources {
+public class NormalizedResourceRequest implements NormalizedResourcesWithMemory {
+
     private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
 
     private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) {
         if (!dest.containsKey(destKey)) {
-            Number value = (Number)src.get(srcKey);
+            Number value = (Number) src.get(srcKey);
             if (value != null) {
                 dest.put(destKey, value.doubleValue());
             }
@@ -51,7 +48,7 @@ public class NormalizedResourceRequest extends NormalizedResources {
     }
 
     private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) {
-        Map<String, Double> ret = normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
+        Map<String, Double> ret = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
             Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
         putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
         putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
@@ -96,7 +93,6 @@ public class NormalizedResourceRequest extends NormalizedResources {
                             stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
                     }
 
-
                 }
             }
         } catch (ParseException e) {
@@ -106,48 +102,38 @@ public class NormalizedResourceRequest extends NormalizedResources {
         return topologyResources;
     }
 
+    private final NormalizedResources normalizedResources;
     private double onHeap;
     private double offHeap;
 
-    /**
-     * Create a new normalized set of resources.  Note that memory is not covered here because it is not consistent in requests vs offers
-     * because of how on heap vs off heap is used.
-     *
-     * @param resources the resources to be normalized.
-     * @param topologyConf the config for the topology
-     */
     private NormalizedResourceRequest(Map<String, ? extends Number> resources,
-                                     Map<String, Object> topologyConf) {
-        super(resources, getDefaultResources(topologyConf));
-        initializeMemory(getNormalizedResources());
+        Map<String, Double> defaultResources) {
+        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
+        normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
+        onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+        offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+        normalizedResources = new NormalizedResources(normalizedResourceMap);
     }
 
     public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
-        this(parseResources(component.get_json_conf()), topoConf);
+        this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
     }
 
     public NormalizedResourceRequest(Map<String, Object> topoConf) {
-        this((Map<String, ? extends Number>) null, topoConf);
+        this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
     }
 
     public NormalizedResourceRequest() {
-        super(null, null);
-        initializeMemory(getNormalizedResources());
+        this((Map<String, ? extends Number>) null, null);
     }
 
-    @Override
-    public Map<String,Double> toNormalizedMap() {
-        Map<String, Double> ret = super.toNormalizedMap();
+    public Map<String, Double> toNormalizedMap() {
+        Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
         ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
         ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
         return ret;
     }
 
-    private void initializeMemory(Map<String, Double> normalizedResources) {
-        onHeap = normalizedResources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
-        offHeap = normalizedResources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
-    }
-
     public double getOnHeapMemoryMb() {
         return onHeap;
     }
@@ -166,17 +152,17 @@ public class NormalizedResourceRequest extends NormalizedResources {
 
     /**
      * Add the resources in other to this.
+     *
      * @param other the other Request to add to this.
      */
     public void add(NormalizedResourceRequest other) {
-        super.add(other);
+        this.normalizedResources.add(other.normalizedResources);
         onHeap += other.onHeap;
         offHeap += other.offHeap;
     }
 
-    @Override
     public void add(WorkerResources value) {
-        super.add(value);
+        this.normalizedResources.add(value);
         //The resources are already normalized
         Map<String, Double> resources = value.get_resources();
         onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
@@ -185,11 +171,20 @@ public class NormalizedResourceRequest extends NormalizedResources {
 
     @Override
     public double getTotalMemoryMb() {
-        return getOnHeapMemoryMb() + getOffHeapMemoryMb();
+        return onHeap + offHeap;
     }
 
     @Override
     public String toString() {
         return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
     }
+
+    public double getTotalCpu() {
+        return this.normalizedResources.getTotalCpu();
+    }
+
+    @Override
+    public NormalizedResources getNormalizedResources() {
+        return this.normalizedResources;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
index 846ab69..f8e911a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
@@ -20,74 +20,39 @@ package org.apache.storm.scheduler.resource;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import org.apache.storm.Config;
+import org.apache.commons.lang.Validate;
 import org.apache.storm.Constants;
 import org.apache.storm.generated.WorkerResources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Resources that have been normalized.
+ * Resources that have been normalized. This class is intended as a delegate for more specific types of normalized resource set, since it
+ * does not keep track of memory as a resource.
  */
-public abstract class NormalizedResources {
+public class NormalizedResources {
 
     private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
-    public static final Map<String, String> RESOURCE_NAME_MAPPING;
 
-    static {
-        Map<String, String> tmp = new HashMap<>();
-        tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
-        tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
-        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
-        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
-        tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
-        RESOURCE_NAME_MAPPING = Collections.unmodifiableMap(tmp);
-    }
+    public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
+    private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
 
-    private static double[] makeArray(Map<String, Double> normalizedResources) {
-        //To avoid locking we will go through the map twice.  It should be small so it is probably not a big deal
-        for (String key : normalizedResources.keySet()) {
-            //We are going to skip over CPU and Memory, because they are captured elsewhere
-            if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
-                && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
-                && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
-                && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
-                resourceNames.computeIfAbsent(key, (k) -> counter.getAndIncrement());
-            }
-        }
-        //By default all of the values are 0
-        double[] ret = new double[counter.get()];
-        for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
-            Integer index = resourceNames.get(entry.getKey());
-            if (index != null) {
-                //index == null if it is memory or CPU
-                ret[index] = entry.getValue();
-            }
-        }
-        return ret;
+    static {
+        resetResourceNames();
     }
 
-    private static final ConcurrentMap<String, Integer> resourceNames = new ConcurrentHashMap<>();
-    private static final AtomicInteger counter = new AtomicInteger(0);
     private double cpu;
     private double[] otherResources;
-    private final Map<String, Double> normalizedResources;
 
     /**
-     * This is for testing only. It allows a test to reset the mapping of resource names in the array. We reset the mapping because some
+     * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some
      * algorithms sadly have different behavior if a resource exists or not.
      */
     @VisibleForTesting
     public static void resetResourceNames() {
-        resourceNames.clear();
-        counter.set(0);
+        RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
+        RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
     }
 
     /**
@@ -96,53 +61,21 @@ public abstract class NormalizedResources {
     public NormalizedResources(NormalizedResources other) {
         cpu = other.cpu;
         otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
-        normalizedResources = other.normalizedResources;
     }
 
     /**
-     * Create a new normalized set of resources. Note that memory is not covered here because it is not consistent in requests vs offers
-     * because of how on heap vs off heap is used.
+     * Create a new normalized set of resources. Note that memory is not managed by this class, as it is not consistent in requests vs
+     * offers because of how on heap vs off heap is used.
      *
-     * @param resources the resources to be normalized.
-     * @param defaults the default resources that will also be normalized and combined with the real resources.
+     * @param normalizedResources the normalized resource map
+     * @param getTotalMemoryMb Supplier of total memory in MB.
      */
-    public NormalizedResources(Map<String, ? extends Number> resources, Map<String, ? extends Number> defaults) {
-        normalizedResources = normalizedResourceMap(defaults);
-        normalizedResources.putAll(normalizedResourceMap(resources));
+    public NormalizedResources(Map<String, Double> normalizedResources) {
         cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
-        otherResources = makeArray(normalizedResources);
-    }
-
-    protected final Map<String, Double> getNormalizedResources() {
-        return this.normalizedResources;
+        otherResources = RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(normalizedResources);
     }
 
     /**
-     * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
-     *
-     * @param resourceMap resource map of either Supervisor or Topology
-     * @return the resource map with common resource names
-     */
-    public static Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
-        if (resourceMap == null) {
-            return new HashMap<>();
-        }
-        return new HashMap<>(resourceMap.entrySet().stream()
-            .collect(Collectors.toMap(
-                //Map the key if needed
-                (e) -> RESOURCE_NAME_MAPPING.getOrDefault(e.getKey(), e.getKey()),
-                //Map the value
-                (e) -> e.getValue().doubleValue())));
-    }
-
-    /**
-     * Get the total amount of memory.
-     *
-     * @return the total amount of memory requested or provided.
-     */
-    public abstract double getTotalMemoryMb();
-
-    /**
      * Get the total amount of cpu.
      *
      * @return the amount of cpu.
@@ -150,15 +83,18 @@ public abstract class NormalizedResources {
     public double getTotalCpu() {
         return cpu;
     }
+    
+    private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
+        if (requiredLength > otherResources.length) {
+            double[] newResources = new double[requiredLength];
+            System.arraycopy(otherResources, 0, newResources, 0, otherResources.length);
+            otherResources = newResources;
+        }
+    }
 
     private void add(double[] resourceArray) {
         int otherLength = resourceArray.length;
-        int length = otherResources.length;
-        if (otherLength > length) {
-            double[] newResources = new double[otherLength];
-            System.arraycopy(newResources, 0, otherResources, 0, length);
-            otherResources = newResources;
-        }
+        zeroPadOtherResourcesIfNecessary(otherLength);
         for (int i = 0; i < otherLength; i++) {
             otherResources[i] += resourceArray[i];
         }
@@ -177,45 +113,39 @@ public abstract class NormalizedResources {
     public void add(WorkerResources value) {
         Map<String, Double> workerNormalizedResources = value.get_resources();
         cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
-        add(makeArray(workerNormalizedResources));
+        add(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources));
     }
 
+    public void throwBecauseResourceBecameNegative(String resourceName, double currentValue, double subtractedValue) {
+        throw new IllegalArgumentException(String.format("Resource amounts should never be negative."
+            + " Resource '%s' with current value '%f' became negative because '%f' was removed.",
+            resourceName, currentValue, subtractedValue));
+    }
+    
     /**
      * Remove the other resources from this. This is the same as subtracting the resources in other from this.
      *
      * @param other the resources we want removed.
+     * @throws IllegalArgumentException if subtracting other from this would result in any resource amount becoming negative.
      */
     public void remove(NormalizedResources other) {
         this.cpu -= other.cpu;
-        assert cpu >= 0.0;
-        int otherLength = other.otherResources.length;
-        int length = otherResources.length;
-        if (otherLength > length) {
-            double[] newResources = new double[otherLength];
-            System.arraycopy(newResources, 0, otherResources, 0, length);
-            otherResources = newResources;
+        if (cpu < 0.0) {
+            throwBecauseResourceBecameNegative(Constants.COMMON_CPU_RESOURCE_NAME, cpu, other.cpu);
         }
+        int otherLength = other.otherResources.length;
+        zeroPadOtherResourcesIfNecessary(otherLength);
         for (int i = 0; i < otherLength; i++) {
             otherResources[i] -= other.otherResources[i];
-            assert otherResources[i] >= 0.0;
+            if (otherResources[i] < 0.0) {
+                throwBecauseResourceBecameNegative(getResourceNameForResourceIndex(i), otherResources[i], other.otherResources[i]);
+            }
         }
     }
 
     @Override
     public String toString() {
-        return "CPU: " + cpu + " Other resources: " + toNormalizedOtherResources();
-    }
-
-    private Map<String, Double> toNormalizedOtherResources() {
-        Map<String, Double> ret = new HashMap<>();
-        int length = otherResources.length;
-        for (Map.Entry<String, Integer> entry : resourceNames.entrySet()) {
-            int index = entry.getValue();
-            if (index < length) {
-                ret.put(entry.getKey(), otherResources[index]);
-            }
-        }
-        return ret;
+        return "Normalized resources: " + toNormalizedMap();
     }
 
     /**
@@ -223,7 +153,7 @@ public abstract class NormalizedResources {
      * user.
      */
     public Map<String, Double> toNormalizedMap() {
-        Map<String, Double> ret = toNormalizedOtherResources();
+        Map<String, Double> ret = RESOURCE_MAP_ARRAY_BRIDGE.translateFromResourceArray(otherResources);
         ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
         return ret;
     }
@@ -240,9 +170,11 @@ public abstract class NormalizedResources {
      * does not check memory because with shared memory it is beyond the scope of this.
      *
      * @param other the resources that we want to check if they would fit in this.
+     * @param thisTotalMemoryMb The total memory in MB of this
+     * @param otherTotalMemoryMb The total memory in MB of other
      * @return true if it might fit, else false if it could not possibly fit.
      */
-    public boolean couldHoldIgnoringSharedMemory(NormalizedResources other) {
+    public boolean couldHoldIgnoringSharedMemory(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
         if (this.cpu < other.getTotalCpu()) {
             return false;
         }
@@ -253,69 +185,70 @@ public abstract class NormalizedResources {
             }
         }
 
-        if (this.getTotalMemoryMb() < other.getTotalMemoryMb()) {
-            return false;
-        }
-        return true;
+        return thisTotalMemoryMb >= otherTotalMemoryMb;
     }
 
-    private void throwBecauseResourceIsMissingFromTotal(int resourceIndex) {
-        String resourceName = null;
-        for (Map.Entry<String, Integer> entry : resourceNames.entrySet()) {
+    private String getResourceNameForResourceIndex(int resourceIndex) {
+        for (Map.Entry<String, Integer> entry : RESOURCE_MAP_ARRAY_BRIDGE.getResourceNamesToArrayIndex().entrySet()) {
             int index = entry.getValue();
             if (index == resourceIndex) {
-                resourceName = entry.getKey();
-                break;
+                return entry.getKey();
             }
         }
-        if (resourceName == null) {
-            throw new IllegalStateException("Array index " + resourceIndex + " is not mapped in the resource names map."
-                + " This should not be possible, and is likely a bug in the Storm code.");
-        }
-        throw new IllegalArgumentException("Total resources does not contain resource '"
-            + resourceName
-            + "'. All resources should be represented in the total. This is likely a bug in the Storm code");
+        return null;
+    }
+
+    private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+        throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
+            + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
+            used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
     }
 
     /**
-     * Calculate the average resource usage percentage with this being the total resources and used being the amounts used.
+     * Calculate the average resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+     * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+     * division by 0. If all resources are skipped the result is defined to be 100.0.
      *
      * @param used the amount of resources used.
-     * @return the average percentage used 0.0 to 100.0. Clamps to 100.0 in case there are no available resources in the total
+     * @param totalMemoryMb The total memory in MB
+     * @param usedMemoryMb The used memory in MB
+     * @return the average percentage used 0.0 to 100.0.
+     * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+     *     resources that are not present in the total.
      */
-    public double calculateAveragePercentageUsedBy(NormalizedResources used) {
+    public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
+                + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+                toNormalizedMap(), used.toNormalizedMap());
+        }
+
         int skippedResourceTypes = 0;
         double total = 0.0;
-        double totalMemory = getTotalMemoryMb();
-        if (totalMemory != 0.0) {
-            total += used.getTotalMemoryMb() / totalMemory;
+        if (usedMemoryMb > totalMemoryMb) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        if (totalMemoryMb != 0.0) {
+            total += usedMemoryMb / totalMemoryMb;
         } else {
             skippedResourceTypes++;
         }
         double totalCpu = getTotalCpu();
+        if (used.getTotalCpu() > getTotalCpu()) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
         if (totalCpu != 0.0) {
             total += used.getTotalCpu() / getTotalCpu();
         } else {
             skippedResourceTypes++;
         }
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Calculating avg percentage used by. Used CPU: {} Total CPU: {} Used Mem: {} Total Mem: {}"
-                + " Other Used: {} Other Total: {}", totalCpu, used.getTotalCpu(), totalMemory, used.getTotalMemoryMb(),
-                this.toNormalizedOtherResources(), used.toNormalizedOtherResources());
-        }
 
         if (used.otherResources.length > otherResources.length) {
-            throwBecauseResourceIsMissingFromTotal(used.otherResources.length);
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
         }
 
         for (int i = 0; i < otherResources.length; i++) {
             double totalValue = otherResources[i];
-            if (totalValue == 0.0) {
-                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
-                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
-                skippedResourceTypes++;
-                continue;
-            }
             double usedValue;
             if (i >= used.otherResources.length) {
                 //Resources missing from used are using none of that resource
@@ -323,14 +256,24 @@ public abstract class NormalizedResources {
             } else {
                 usedValue = used.otherResources[i];
             }
+            if (usedValue > totalValue) {
+                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+            }
+            if (totalValue == 0.0) {
+                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+                skippedResourceTypes++;
+                continue;
+            }
+
             total += usedValue / totalValue;
         }
         //Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
         int divisor = 2 + otherResources.length - skippedResourceTypes;
         if (divisor == 0) {
-            /*This is an arbitrary choice to make the result consistent with calculateMin.
-             Any value would be valid here, becase there are no (non-zero) resources in the total set of resources,
-             so we're trying to average 0 values.
+            /*
+             * This is an arbitrary choice to make the result consistent with calculateMin. Any value would be valid here, becase there are
+             * no (non-zero) resources in the total set of resources, so we're trying to average 0 values.
              */
             return 100.0;
         } else {
@@ -339,41 +282,43 @@ public abstract class NormalizedResources {
     }
 
     /**
-     * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used.
+     * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+     * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+     * division by 0. If all resources are skipped the result is defined to be 100.0.
      *
      * @param used the amount of resources used.
-     * @return the minimum percentage used 0.0 to 100.0. Clamps to 100.0 in case there are no available resources in the total.
+     * @param totalMemoryMb The total memory in MB
+     * @param usedMemoryMb The used memory in MB
+     * @return the minimum percentage used 0.0 to 100.0.
+     * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+     *     resources that are not present in the total.
      */
-    public double calculateMinPercentageUsedBy(NormalizedResources used) {
-        double totalMemory = getTotalMemoryMb();
-        double totalCpu = getTotalCpu();
-
+    public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Calculating min percentage used by. Used CPU: {} Total CPU: {} Used Mem: {} Total Mem: {}"
-                + " Other Used: {} Other Total: {}", totalCpu, used.getTotalCpu(), totalMemory, used.getTotalMemoryMb(),
-                toNormalizedOtherResources(), used.toNormalizedOtherResources());
+            LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}"
+                + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+                toNormalizedMap(), used.toNormalizedMap());
         }
 
-        if (used.otherResources.length > otherResources.length) {
-            throwBecauseResourceIsMissingFromTotal(used.otherResources.length);
+        double min = 1.0;
+        if (usedMemoryMb > totalMemoryMb) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
         }
-
-        if (used.otherResources.length != otherResources.length
-            || totalMemory == 0.0
-            || totalCpu == 0.0) {
-            //If the lengths don't match one of the resources will be 0, which means we would calculate the percentage to be 0.0
-            // and so the min would be 0.0 (assuming that we can never go negative on a resource being used.
-            return 0.0;
+        if (totalMemoryMb != 0.0) {
+            min = Math.min(min, usedMemoryMb / totalMemoryMb);
         }
-
-        double min = 100.0;
-        if (totalMemory != 0.0) {
-            min = Math.min(min, used.getTotalMemoryMb() / totalMemory);
+        double totalCpu = getTotalCpu();
+        if (used.getTotalCpu() > totalCpu) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
         }
         if (totalCpu != 0.0) {
             min = Math.min(min, used.getTotalCpu() / totalCpu);
         }
 
+        if (used.otherResources.length > otherResources.length) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        
         for (int i = 0; i < otherResources.length; i++) {
             if (otherResources[i] == 0.0) {
                 //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
@@ -384,6 +329,9 @@ public abstract class NormalizedResources {
                 //Resources missing from used are using none of that resource
                 return 0;
             }
+            if (used.otherResources[i] > otherResources[i]) {
+                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+            }
             min = Math.min(min, used.otherResources[i] / otherResources[i]);
         }
         return min * 100.0;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
new file mode 100644
index 0000000..640233a
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+/**
+ * Intended for {@link NormalizedResources} wrappers that handle memory.
+ */
+public interface NormalizedResourcesWithMemory {
+
+    NormalizedResources getNormalizedResources();
+
+    double getTotalMemoryMb();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 5350005..db9ca73 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -25,14 +25,11 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-
-import org.apache.storm.Constants;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
-import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
index 6c70ff1..2bd2b86 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
-
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.SchedulerAssignment;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index dc557ff..e730705 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -30,7 +30,6 @@ import org.apache.storm.DaemonConfig;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.IScheduler;
 import org.apache.storm.scheduler.SchedulerAssignment;
-import org.apache.storm.scheduler.SchedulerAssignmentImpl;
 import org.apache.storm.scheduler.SingleTopologyCluster;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
new file mode 100644
index 0000000..cf4f80b
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.storm.Constants;
+
+/**
+ * Provides translation between normalized resource maps and resource value arrays. Some operations use resource value arrays instead of the
+ * full normalized resource map as an optimization. See {@link NormalizedResources}.
+ */
+public class ResourceMapArrayBridge {
+
+    private final ConcurrentMap<String, Integer> resourceNamesToArrayIndex = new ConcurrentHashMap<>();
+    private final AtomicInteger counter = new AtomicInteger(0);
+
+    /**
+     * Translates a normalized resource map to an array of resource values. Each resource name will be assigned an index in the array, which
+     * is guaranteed to be consistent with subsequent invocations of this method. Note that CPU and memory resources are not translated by
+     * this method, as they are expected to be captured elsewhere.
+     *
+     * @param normalizedResources The resources to translate to an array
+     * @return The array of resource values
+     */
+    public double[] translateToResourceArray(Map<String, Double> normalizedResources) {
+        //To avoid locking we will go through the map twice.  It should be small so it is probably not a big deal
+        for (String key : normalizedResources.keySet()) {
+            //We are going to skip over CPU and Memory, because they are captured elsewhere
+            if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
+                resourceNamesToArrayIndex.computeIfAbsent(key, (k) -> counter.getAndIncrement());
+            }
+        }
+        //By default all of the values are 0
+        double[] ret = new double[counter.get()];
+        for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
+            Integer index = resourceNamesToArrayIndex.get(entry.getKey());
+            if (index != null) {
+                //index == null if it is memory or CPU
+                ret[index] = entry.getValue();
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Translates an array of resource values to a normalized resource map.
+     *
+     * @param resources The resource array to translate
+     * @return The normalized resource map
+     */
+    public Map<String, Double> translateFromResourceArray(double[] resources) {
+        Map<String, Double> ret = new HashMap<>();
+        int length = resources.length;
+        for (Map.Entry<String, Integer> entry : resourceNamesToArrayIndex.entrySet()) {
+            int index = entry.getValue();
+            if (index < length) {
+                ret.put(entry.getKey(), resources[index]);
+            }
+        }
+        return ret;
+    }
+
+    public Map<String, Integer> getResourceNamesToArrayIndex() {
+        return Collections.unmodifiableMap(resourceNamesToArrayIndex);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
new file mode 100644
index 0000000..d59ba92
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+
+/**
+ * Provides resource name normalization for resource maps.
+ */
+public class ResourceNameNormalizer {
+
+    private final Map<String, String> resourceNameMapping;
+
+    public ResourceNameNormalizer() {
+        Map<String, String> tmp = new HashMap<>();
+        tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
+        tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
+        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
+        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
+        tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
+        resourceNameMapping = Collections.unmodifiableMap(tmp);
+    }
+
+    /**
+     * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
+     *
+     * @param resourceMap resource map of either Supervisor or Topology
+     * @return the resource map with common resource names
+     */
+    public Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
+        if (resourceMap == null) {
+            return new HashMap<>();
+        }
+        return new HashMap<>(resourceMap.entrySet().stream()
+            .collect(Collectors.toMap(
+                //Map the key if needed
+                (e) -> resourceNameMapping.getOrDefault(e.getKey(), e.getKey()),
+                //Map the value
+                (e) -> e.getValue().doubleValue())));
+    }
+
+    public Map<String, String> getResourceNameMapping() {
+        return resourceNameMapping;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index db9dbfa..ca4ea63 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -20,9 +20,7 @@ package org.apache.storm.scheduler.resource;
 
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.storm.Config;
-import org.apache.storm.Constants;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.SpoutSpec;
@@ -33,8 +31,6 @@ import org.json.simple.parser.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
 public class ResourceUtils {
     private static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class);
 
@@ -95,7 +91,8 @@ public class ResourceUtils {
 
                 if (resourceUpdatesMap.containsKey(spoutName)) {
                     ComponentCommon spoutCommon = spoutSpec.get_common();
-                    Map<String, Double> resourcesUpdate = normalizedResourceMap(resourceUpdatesMap.get(spoutName));
+                    Map<String, Double> resourcesUpdate = NormalizedResources
+                        .RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceUpdatesMap.get(spoutName));
                     String newJsonConf = getJsonWithUpdatedResources(spoutCommon.get_json_conf(), resourcesUpdate);
                     spoutCommon.set_json_conf(newJsonConf);
                     componentsUpdated.put(spoutName, resourcesUpdate);
@@ -110,7 +107,8 @@ public class ResourceUtils {
 
                 if (resourceUpdatesMap.containsKey(boltName)) {
                     ComponentCommon boltCommon = boltObj.get_common();
-                    Map<String, Double> resourcesUpdate = normalizedResourceMap(resourceUpdatesMap.get(boltName));
+                    Map<String, Double> resourcesUpdate = NormalizedResources
+                        .RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceUpdatesMap.get(boltName));
                     String newJsonConf = getJsonWithUpdatedResources(boltCommon.get_json_conf(), resourceUpdatesMap.get(boltName));
                     boltCommon.set_json_conf(newJsonConf);
                     componentsUpdated.put(boltName, resourcesUpdate);
@@ -128,7 +126,7 @@ public class ResourceUtils {
     }
 
     public static String getCorrespondingLegacyResourceName(String normalizedResourceName) {
-        for(Map.Entry<String, String> entry : NormalizedResources.RESOURCE_NAME_MAPPING.entrySet()) {
+        for(Map.Entry<String, String> entry : NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().entrySet()) {
             if (entry.getValue().equals(normalizedResourceName)) {
                 return entry.getKey();
             }
@@ -148,7 +146,7 @@ public class ResourceUtils {
                     );
 
             for (Map.Entry<String, Double> resourceUpdateEntry : resourceUpdates.entrySet()) {
-                if (NormalizedResources.RESOURCE_NAME_MAPPING.containsValue(resourceUpdateEntry.getKey())) {
+                if (NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().containsValue(resourceUpdateEntry.getKey())) {
                     // if there will be legacy values they will be in the outer conf
                     jsonObject.remove(getCorrespondingLegacyResourceName(resourceUpdateEntry.getKey()));
                     componentResourceMap.remove(getCorrespondingLegacyResourceName(resourceUpdateEntry.getKey()));

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 67bc867..73e1aa0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -19,7 +19,6 @@
 package org.apache.storm.scheduler.resource.strategies.scheduling;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -31,18 +30,15 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
-
-import org.apache.storm.executor.Executor;
 import org.apache.storm.generated.ComponentType;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.Component;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
 import org.apache.storm.scheduler.resource.RAS_Node;
 import org.apache.storm.scheduler.resource.RAS_Nodes;
-import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index e0dc881..efa3ecf 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -23,13 +23,11 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.TreeSet;
-
 import org.apache.storm.Config;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.Component;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
-
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import org.apache.storm.scheduler.resource.SchedulingStatus;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index 108dc49..2b0ca40 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -21,16 +21,13 @@ package org.apache.storm.scheduler.resource.strategies.scheduling;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.TreeSet;
 import org.apache.storm.Config;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.Component;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import org.apache.storm.scheduler.resource.SchedulingStatus;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index a64c9b4..6f33251 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -25,28 +25,27 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileWriter;
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.RandomAccessFile;
-import java.nio.file.Files;
 import java.nio.file.FileSystems;
+import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.jar.JarEntry;
-import java.util.jar.JarFile;
 import java.util.List;
 import java.util.Map;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 import javax.security.auth.Subject;
-
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.exec.CommandLine;
@@ -54,16 +53,15 @@ import org.apache.commons.exec.DefaultExecutor;
 import org.apache.commons.exec.ExecuteException;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.BlobStoreAclHandler;
 import org.apache.storm.blobstore.ClientBlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.blobstore.LocalFsBlobStore;
 import org.apache.storm.blobstore.LocalModeClientBlobStore;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
 import org.apache.storm.daemon.StormCommon;
-import org.apache.storm.DaemonConfig;
 import org.apache.storm.generated.AccessControl;
 import org.apache.storm.generated.AccessControlType;
 import org.apache.storm.generated.AuthorizationException;
@@ -73,8 +71,8 @@ import org.apache.storm.generated.ReadableBlobMeta;
 import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
 import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
 import org.apache.storm.security.auth.SingleUserPrincipal;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index eb5f70f..360db3a 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -501,9 +501,6 @@ public class TestResourceAwareScheduler {
     }
 
     public void testHeterogeneousCluster(Config topologyConf, String strategyName) {
-        Map<String, Double> test = new HashMap<>();
-        test.put("gpu.count", 0.0);
-        new NormalizedResourceOffer(test);
         LOG.info("\n\n\t\ttestHeterogeneousCluster");
         INimbus iNimbus = new INimbusTest();
         Map<String, Double> resourceMap1 = new HashMap<>(); // strong supervisor node
@@ -513,8 +510,8 @@ public class TestResourceAwareScheduler {
         resourceMap2.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
         resourceMap2.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0);
 
-        resourceMap1 = NormalizedResources.normalizedResourceMap(resourceMap1);
-        resourceMap2 = NormalizedResources.normalizedResourceMap(resourceMap2);
+        resourceMap1 = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap1);
+        resourceMap2 = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap2);
 
         Map<String, SupervisorDetails> supMap = new HashMap<>();
         for (int i = 0; i < 2; i++) {

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 4a2e644..3d37ad9 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -65,8 +65,6 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
 public class TestUtilsForResourceAwareScheduler {
     private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
 
@@ -155,7 +153,7 @@ public class TestUtilsForResourceAwareScheduler {
             for (int j = 0; j < numPorts; j++) {
                 ports.add(j);
             }
-            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, normalizedResourceMap(resourceMap));
+            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
             retList.put(sup.getId(), sup);
         }
         return retList;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
new file mode 100644
index 0000000..1f78f53
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import org.apache.storm.scheduler.resource.NormalizedResources;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+public class NormalizedResourcesRule implements TestRule {
+
+    public static class NRStatement extends Statement {
+        private final Statement base;
+
+        public NRStatement(Statement base) {
+            this.base = base;
+        }
+
+        @Override
+        public void evaluate() throws Throwable {
+            NormalizedResources.resetResourceNames();
+            try {
+                base.evaluate();
+            } finally {
+                NormalizedResources.resetResourceNames();
+            }
+        }
+    }
+
+    @Override
+    public Statement apply(Statement statement, Description description) {
+        return new NRStatement(statement);
+    }
+}