You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/01/08 21:20:54 UTC

[1/6] storm git commit: STORM-2859: Fix a number of issues with NormalizedResources when resource totals are zero

Repository: storm
Updated Branches:
  refs/heads/master 7ecb3d73e -> e8dd1f7e2


STORM-2859: Fix a number of issues with NormalizedResources when resource totals are zero


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8aae491b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8aae491b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8aae491b

Branch: refs/heads/master
Commit: 8aae491b6b1e2be2a085f61d45d5d915203eb0ee
Parents: 873028b
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Dec 13 16:47:31 2017 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Thu Jan 4 16:36:21 2018 +0100

----------------------------------------------------------------------
 .../apache/storm/blobstore/BlobStoreUtils.java  |   4 +-
 .../resource/NormalizedResourceOffer.java       |  10 +-
 .../resource/NormalizedResourceRequest.java     |   7 +-
 .../scheduler/resource/NormalizedResources.java | 191 ++++++++++++++-----
 .../DefaultResourceAwareStrategy.java           |  13 +-
 .../TestDefaultResourceAwareStrategy.java       |  39 ++--
 .../TestGenericResourceAwareStrategy.java       |   4 +-
 7 files changed, 180 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
index 9aca91c..d660ab0 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -196,10 +196,10 @@ public class BlobStoreUtils {
                 // Catching and logging KeyNotFoundException because, if
                 // there is a subsequent update and delete, the non-leader
                 // nimbodes might throw an exception.
-                LOG.info("KeyNotFoundException {}", knf);
+                LOG.info("KeyNotFoundException", knf);
             } catch (Exception exp) {
                 // Logging an exception while client is connecting
-                LOG.error("Exception {}", exp);
+                LOG.error("Exception", exp);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
index 2623655..58f12d0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
@@ -34,17 +34,18 @@ public class NormalizedResourceOffer extends NormalizedResources {
     private double totalMemory;
 
     /**
-     * Create a new normalized set of resources.  Note that memory is not covered here becasue it is not consistent in requests vs offers
+     * Create a new normalized set of resources.  Note that memory is not covered here because it is not consistent in requests vs offers
      * because of how on heap vs off heap is used.
      *
      * @param resources the resources to be normalized.
      */
     public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
         super(resources, null);
+        totalMemory = getNormalizedResources().getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
     }
 
     public NormalizedResourceOffer() {
-        super(null, null);
+        this((Map<String, ? extends Number>)null);
     }
 
     public NormalizedResourceOffer(NormalizedResourceOffer other) {
@@ -53,11 +54,6 @@ public class NormalizedResourceOffer extends NormalizedResources {
     }
 
     @Override
-    protected void initializeMemory(Map<String, Double> normalizedResources) {
-        totalMemory = normalizedResources.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
-    }
-
-    @Override
     public double getTotalMemoryMb() {
         return totalMemory;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
index 926184c..5963750 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
@@ -110,7 +110,7 @@ public class NormalizedResourceRequest extends NormalizedResources {
     private double offHeap;
 
     /**
-     * Create a new normalized set of resources.  Note that memory is not covered here becasue it is not consistent in requests vs offers
+     * Create a new normalized set of resources.  Note that memory is not covered here because it is not consistent in requests vs offers
      * because of how on heap vs off heap is used.
      *
      * @param resources the resources to be normalized.
@@ -119,6 +119,7 @@ public class NormalizedResourceRequest extends NormalizedResources {
     private NormalizedResourceRequest(Map<String, ? extends Number> resources,
                                      Map<String, Object> topologyConf) {
         super(resources, getDefaultResources(topologyConf));
+        initializeMemory(getNormalizedResources());
     }
 
     public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
@@ -131,6 +132,7 @@ public class NormalizedResourceRequest extends NormalizedResources {
 
     public NormalizedResourceRequest() {
         super(null, null);
+        initializeMemory(getNormalizedResources());
     }
 
     @Override
@@ -141,8 +143,7 @@ public class NormalizedResourceRequest extends NormalizedResources {
         return ret;
     }
 
-    @Override
-    protected void initializeMemory(Map<String, Double> normalizedResources) {
+    private void initializeMemory(Map<String, Double> normalizedResources) {
         onHeap = normalizedResources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
         offHeap = normalizedResources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
index eea38cf..846ab69 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
  * Resources that have been normalized.
  */
 public abstract class NormalizedResources {
+
     private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
     public static final Map<String, String> RESOURCE_NAME_MAPPING;
 
@@ -62,7 +63,7 @@ public abstract class NormalizedResources {
             }
         }
         //By default all of the values are 0
-        double [] ret = new double[counter.get()];
+        double[] ret = new double[counter.get()];
         for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
             Integer index = resourceNames.get(entry.getKey());
             if (index != null) {
@@ -77,11 +78,11 @@ public abstract class NormalizedResources {
     private static final AtomicInteger counter = new AtomicInteger(0);
     private double cpu;
     private double[] otherResources;
+    private final Map<String, Double> normalizedResources;
 
     /**
-     * This is for testing only.  It allows a test to reset the mapping of resource names in the array.
-     * We reset the mapping because some algorithms sadly have different behavior if a resource exists
-     * or not.
+     * This is for testing only. It allows a test to reset the mapping of resource names in the array. We reset the mapping because some
+     * algorithms sadly have different behavior if a resource exists or not.
      */
     @VisibleForTesting
     public static void resetResourceNames() {
@@ -89,34 +90,36 @@ public abstract class NormalizedResources {
         counter.set(0);
     }
 
+    /**
+     * Copy constructor.
+     */
     public NormalizedResources(NormalizedResources other) {
         cpu = other.cpu;
         otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
+        normalizedResources = other.normalizedResources;
     }
 
     /**
-     * Create a new normalized set of resources.  Note that memory is not
-     * covered here because it is not consistent in requests vs offers because
-     * of how on heap vs off heap is used.
+     * Create a new normalized set of resources. Note that memory is not covered here because it is not consistent in requests vs offers
+     * because of how on heap vs off heap is used.
+     *
      * @param resources the resources to be normalized.
      * @param defaults the default resources that will also be normalized and combined with the real resources.
      */
     public NormalizedResources(Map<String, ? extends Number> resources, Map<String, ? extends Number> defaults) {
-        Map<String, Double> normalizedResources = normalizedResourceMap(defaults);
+        normalizedResources = normalizedResourceMap(defaults);
         normalizedResources.putAll(normalizedResourceMap(resources));
         cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
         otherResources = makeArray(normalizedResources);
-        initializeMemory(normalizedResources);
     }
 
-    /**
-     * Initialize any memory usage from the normalized map.
-     * @param normalizedResources the normalized resource map.
-     */
-    protected abstract void initializeMemory(Map<String, Double> normalizedResources);
+    protected final Map<String, Double> getNormalizedResources() {
+        return this.normalizedResources;
+    }
 
     /**
      * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
+     *
      * @param resourceMap resource map of either Supervisor or Topology
      * @return the resource map with common resource names
      */
@@ -134,12 +137,14 @@ public abstract class NormalizedResources {
 
     /**
      * Get the total amount of memory.
+     *
      * @return the total amount of memory requested or provided.
      */
     public abstract double getTotalMemoryMb();
 
     /**
      * Get the total amount of cpu.
+     *
      * @return the amount of cpu.
      */
     public double getTotalCpu() {
@@ -150,7 +155,7 @@ public abstract class NormalizedResources {
         int otherLength = resourceArray.length;
         int length = otherResources.length;
         if (otherLength > length) {
-            double [] newResources = new double[otherLength];
+            double[] newResources = new double[otherLength];
             System.arraycopy(newResources, 0, otherResources, 0, length);
             otherResources = newResources;
         }
@@ -166,16 +171,18 @@ public abstract class NormalizedResources {
 
     /**
      * Add the resources from a worker to this.
+     *
      * @param value the worker resources that should be added to this.
      */
     public void add(WorkerResources value) {
-        Map<String, Double> normalizedResources = value.get_resources();
-        cpu += normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
-        add(makeArray(normalizedResources));
+        Map<String, Double> workerNormalizedResources = value.get_resources();
+        cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+        add(makeArray(workerNormalizedResources));
     }
 
     /**
-     * Remove the resources from other.  This is the same as subtracting the resources in other from this.
+     * Remove the other resources from this. This is the same as subtracting the resources in other from this.
+     *
      * @param other the resources we want removed.
      */
     public void remove(NormalizedResources other) {
@@ -184,11 +191,11 @@ public abstract class NormalizedResources {
         int otherLength = other.otherResources.length;
         int length = otherResources.length;
         if (otherLength > length) {
-            double [] newResources = new double[otherLength];
+            double[] newResources = new double[otherLength];
             System.arraycopy(newResources, 0, otherResources, 0, length);
             otherResources = newResources;
         }
-        for (int i = 0; i < Math.min(length, otherLength); i++) {
+        for (int i = 0; i < otherLength; i++) {
             otherResources[i] -= other.otherResources[i];
             assert otherResources[i] >= 0.0;
         }
@@ -196,18 +203,13 @@ public abstract class NormalizedResources {
 
     @Override
     public String toString() {
-        return "CPU: " + cpu;
+        return "CPU: " + cpu + " Other resources: " + toNormalizedOtherResources();
     }
 
-    /**
-     * Return a Map of the normalized resource name to a double.  This should only
-     * be used when returning thrift resource requests to the end user.
-     */
-    public Map<String,Double> toNormalizedMap() {
-        HashMap<String, Double> ret = new HashMap<>();
-        ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+    private Map<String, Double> toNormalizedOtherResources() {
+        Map<String, Double> ret = new HashMap<>();
         int length = otherResources.length;
-        for (Map.Entry<String, Integer> entry: resourceNames.entrySet()) {
+        for (Map.Entry<String, Integer> entry : resourceNames.entrySet()) {
             int index = entry.getValue();
             if (index < length) {
                 ret.put(entry.getKey(), otherResources[index]);
@@ -216,6 +218,16 @@ public abstract class NormalizedResources {
         return ret;
     }
 
+    /**
+     * Return a Map of the normalized resource name to a double. This should only be used when returning thrift resource requests to the end
+     * user.
+     */
+    public Map<String, Double> toNormalizedMap() {
+        Map<String, Double> ret = toNormalizedOtherResources();
+        ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+        return ret;
+    }
+
     private double getResourceAt(int index) {
         if (index >= otherResources.length) {
             return 0.0;
@@ -224,8 +236,9 @@ public abstract class NormalizedResources {
     }
 
     /**
-     * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory.
-     * It does not check memory because with shared memory it is beyond the scope of this.
+     * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
+     * does not check memory because with shared memory it is beyond the scope of this.
+     *
      * @param other the resources that we want to check if they would fit in this.
      * @return true if it might fit, else false if it could not possibly fit.
      */
@@ -246,42 +259,105 @@ public abstract class NormalizedResources {
         return true;
     }
 
+    private void throwBecauseResourceIsMissingFromTotal(int resourceIndex) {
+        String resourceName = null;
+        for (Map.Entry<String, Integer> entry : resourceNames.entrySet()) {
+            int index = entry.getValue();
+            if (index == resourceIndex) {
+                resourceName = entry.getKey();
+                break;
+            }
+        }
+        if (resourceName == null) {
+            throw new IllegalStateException("Array index " + resourceIndex + " is not mapped in the resource names map."
+                + " This should not be possible, and is likely a bug in the Storm code.");
+        }
+        throw new IllegalArgumentException("Total resources does not contain resource '"
+            + resourceName
+            + "'. All resources should be represented in the total. This is likely a bug in the Storm code");
+    }
+
     /**
-     * Calculate the average resource usage percentage with this being the total resources and
-     * used being the amounts used.
+     * Calculate the average resource usage percentage with this being the total resources and used being the amounts used.
+     *
      * @param used the amount of resources used.
-     * @return the average percentage used 0.0 to 100.0.
+     * @return the average percentage used 0.0 to 100.0. Clamps to 100.0 in case there are no available resources in the total
      */
     public double calculateAveragePercentageUsedBy(NormalizedResources used) {
+        int skippedResourceTypes = 0;
         double total = 0.0;
         double totalMemory = getTotalMemoryMb();
         if (totalMemory != 0.0) {
             total += used.getTotalMemoryMb() / totalMemory;
+        } else {
+            skippedResourceTypes++;
         }
         double totalCpu = getTotalCpu();
         if (totalCpu != 0.0) {
             total += used.getTotalCpu() / getTotalCpu();
+        } else {
+            skippedResourceTypes++;
         }
-        //If total is 0 we add in a 0% used, so we can just skip over anything that is not in both.
-        int length = Math.min(used.otherResources.length, otherResources.length);
-        for (int i = 0; i < length; i++) {
-            if (otherResources[i] != 0.0) {
-                total += used.otherResources[i] / otherResources[i];
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Calculating avg percentage used by. Used CPU: {} Total CPU: {} Used Mem: {} Total Mem: {}"
+                + " Other Used: {} Other Total: {}", totalCpu, used.getTotalCpu(), totalMemory, used.getTotalMemoryMb(),
+                this.toNormalizedOtherResources(), used.toNormalizedOtherResources());
+        }
+
+        if (used.otherResources.length > otherResources.length) {
+            throwBecauseResourceIsMissingFromTotal(used.otherResources.length);
+        }
+
+        for (int i = 0; i < otherResources.length; i++) {
+            double totalValue = otherResources[i];
+            if (totalValue == 0.0) {
+                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+                skippedResourceTypes++;
+                continue;
             }
+            double usedValue;
+            if (i >= used.otherResources.length) {
+                //Resources missing from used are using none of that resource
+                usedValue = 0.0;
+            } else {
+                usedValue = used.otherResources[i];
+            }
+            total += usedValue / totalValue;
+        }
+        //Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
+        int divisor = 2 + otherResources.length - skippedResourceTypes;
+        if (divisor == 0) {
+            /*This is an arbitrary choice to make the result consistent with calculateMin.
+             Any value would be valid here, becase there are no (non-zero) resources in the total set of resources,
+             so we're trying to average 0 values.
+             */
+            return 100.0;
+        } else {
+            return (total * 100.0) / divisor;
         }
-        //To get the count we divide by we need to take the maximum length because we are doing an average.
-        return (total * 100.0) / (2 + Math.max(otherResources.length, used.otherResources.length));
     }
 
     /**
-     * Calculate the minimum resource usage percentage with this being the total resources and
-     * used being the amounts used.
+     * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used.
+     *
      * @param used the amount of resources used.
-     * @return the minimum percentage used 0.0 to 100.0.
+     * @return the minimum percentage used 0.0 to 100.0. Clamps to 100.0 in case there are no available resources in the total.
      */
     public double calculateMinPercentageUsedBy(NormalizedResources used) {
         double totalMemory = getTotalMemoryMb();
         double totalCpu = getTotalCpu();
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Calculating min percentage used by. Used CPU: {} Total CPU: {} Used Mem: {} Total Mem: {}"
+                + " Other Used: {} Other Total: {}", totalCpu, used.getTotalCpu(), totalMemory, used.getTotalMemoryMb(),
+                toNormalizedOtherResources(), used.toNormalizedOtherResources());
+        }
+
+        if (used.otherResources.length > otherResources.length) {
+            throwBecauseResourceIsMissingFromTotal(used.otherResources.length);
+        }
+
         if (used.otherResources.length != otherResources.length
             || totalMemory == 0.0
             || totalCpu == 0.0) {
@@ -289,16 +365,27 @@ public abstract class NormalizedResources {
             // and so the min would be 0.0 (assuming that we can never go negative on a resource being used.
             return 0.0;
         }
-        double min = used.getTotalMemoryMb() / totalMemory;
-        min = Math.min(min, used.getTotalCpu() / getTotalCpu());
+
+        double min = 100.0;
+        if (totalMemory != 0.0) {
+            min = Math.min(min, used.getTotalMemoryMb() / totalMemory);
+        }
+        if (totalCpu != 0.0) {
+            min = Math.min(min, used.getTotalCpu() / totalCpu);
+        }
 
         for (int i = 0; i < otherResources.length; i++) {
-            if (otherResources[i] != 0.0) {
-                min = Math.min(min, used.otherResources[i] / otherResources[i]);
-            } else {
-                return 0.0; //0 will be the minimum, because we count values not in here as 0
+            if (otherResources[i] == 0.0) {
+                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+                continue;
+            }
+            if (i >= used.otherResources.length) {
+                //Resources missing from used are using none of that resource
+                return 0;
             }
+            min = Math.min(min, used.otherResources[i] / otherResources[i]);
         }
         return min * 100.0;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index fc746b0..e0dc881 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -20,11 +20,8 @@ package org.apache.storm.scheduler.resource.strategies.scheduling;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.TreeSet;
 
 import org.apache.storm.Config;
@@ -33,7 +30,6 @@ import org.apache.storm.scheduler.Component;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
 
-import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import org.apache.storm.scheduler.resource.SchedulingStatus;
 import org.slf4j.Logger;
@@ -41,7 +37,7 @@ import org.slf4j.LoggerFactory;
 
 public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy implements IStrategy {
 
-    private static final Logger LOG = LoggerFactory.getLogger(BaseResourceAwareStrategy.class);
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
 
     @Override
     public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
@@ -125,10 +121,15 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl
     protected TreeSet<ObjectResources> sortObjectResources(
             final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
             final ExistingScheduleFunc existingScheduleFunc) {
-
+        
         for (ObjectResources objectResources : allResources.objectResources) {
             objectResources.effectiveResources =
                 allResources.availableResourcesOverall.calculateMinPercentageUsedBy(objectResources.availableResources);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Effective resources for {} is {}, and numExistingSchedule is {}",
+                    objectResources.id, objectResources.effectiveResources,
+                    existingScheduleFunc.getNumExistingSchedule(objectResources.id));
+            }
         }
 
         TreeSet<ObjectResources> sortedObjectResources =

http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 00f8f14..6a3fcbb 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -28,7 +28,6 @@ import org.apache.storm.scheduler.SupervisorResources;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.INimbus;
 import org.apache.storm.scheduler.SchedulerAssignment;
-import org.apache.storm.scheduler.SchedulerAssignmentImpl;
 import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
@@ -53,6 +52,7 @@ import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareSched
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -65,7 +65,7 @@ import java.util.TreeSet;
 public class TestDefaultResourceAwareStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(TestDefaultResourceAwareStrategy.class);
 
-    private static int currentTime = 1450418597;
+    private static final int CURRENT_TIME = 1450418597;
 
     private static class TestDNSToSwitchMapping implements DNSToSwitchMapping {
         private final Map<String, String> result;
@@ -124,10 +124,10 @@ public class TestDefaultResourceAwareStrategy {
         conf.put(Config.TOPOLOGY_NAME, "testTopology");
         conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 2000);
         TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
-                genExecsAndComps(stormToplogy), currentTime, "user");
+                genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
 
         Topologies topologies = new Topologies(topo);
-        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, conf);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, conf);
 
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
 
@@ -191,10 +191,10 @@ public class TestDefaultResourceAwareStrategy {
         conf.put(Config.TOPOLOGY_SUBMITTER_USER, "user");
 
         TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
-                genExecsAndComps(stormToplogy), currentTime, "user");
+                genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
 
         Topologies topologies = new Topologies(topo);
-        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, conf);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, conf);
 
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
 
@@ -239,11 +239,16 @@ public class TestDefaultResourceAwareStrategy {
         //generate some that has alot of cpu but little of memory
         final Map<String, SupervisorDetails> supMapRack4 = genSupervisors(10, 4, 40, 400 + 200 + 10, 1000);
 
+        //Generate some that have neither resource, to verify that the strategy will prioritize this last
+        //Also put a generic resource with 0 value in the resources list, to verify that it doesn't affect the sorting
+        final Map<String, SupervisorDetails> supMapRack5 = genSupervisors(10, 4, 50, 0.0, 0.0, Collections.singletonMap("gpu.count", 0.0));
+
         supMap.putAll(supMapRack0);
         supMap.putAll(supMapRack1);
         supMap.putAll(supMapRack2);
         supMap.putAll(supMapRack3);
         supMap.putAll(supMapRack4);
+        supMap.putAll(supMapRack5);
 
         Config config = createClusterConfig(100, 500, 500, null);
         config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
@@ -251,14 +256,14 @@ public class TestDefaultResourceAwareStrategy {
 
         //create test DNSToSwitchMapping plugin
         DNSToSwitchMapping TestNetworkTopographyPlugin =
-            new TestDNSToSwitchMapping(supMapRack0, supMapRack1, supMapRack2, supMapRack3, supMapRack4);
+            new TestDNSToSwitchMapping(supMapRack0, supMapRack1, supMapRack2, supMapRack3, supMapRack4, supMapRack5);
 
         //generate topologies
-        TopologyDetails topo1 = genTopology("topo-1", config, 8, 0, 2, 0, currentTime - 2, 10, "user");
-        TopologyDetails topo2 = genTopology("topo-2", config, 8, 0, 2, 0, currentTime - 2, 10, "user");
+        TopologyDetails topo1 = genTopology("topo-1", config, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
+        TopologyDetails topo2 = genTopology("topo-2", config, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
         
         Topologies topologies = new Topologies(topo1, topo2);
-        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, config);
         
         List<String> supHostnames = new LinkedList<>();
         for (SupervisorDetails sup : supMap.values()) {
@@ -284,7 +289,7 @@ public class TestDefaultResourceAwareStrategy {
         TreeSet<ObjectResources> sortedRacks = rs.sortRacks(null, topo1);
         LOG.info("Sorted Racks {}", sortedRacks);
 
-        Assert.assertEquals("# of racks sorted", 5, sortedRacks.size());
+        Assert.assertEquals("# of racks sorted", 6, sortedRacks.size());
         Iterator<ObjectResources> it = sortedRacks.iterator();
         // Ranked first since rack-0 has the most balanced set of resources
         Assert.assertEquals("rack-0 should be ordered first", "rack-0", it.next().id);
@@ -294,8 +299,10 @@ public class TestDefaultResourceAwareStrategy {
         Assert.assertEquals("rack-4 should be ordered third", "rack-4", it.next().id);
         // Ranked fourth since rack-3 has alot of memory but not cpu
         Assert.assertEquals("rack-3 should be ordered fourth", "rack-3", it.next().id);
-        //Ranked last since rack-2 has not cpu resources
+        //Ranked fifth since rack-2 has not cpu resources
         Assert.assertEquals("rack-2 should be ordered fifth", "rack-2", it.next().id);
+        //Ranked last since rack-5 has neither CPU nor memory available
+        assertEquals("Rack-5 should be ordered sixth", "rack-5", it.next().id);
 
         SchedulingResult schedulingResult = rs.schedule(cluster, topo1);
         assert(schedulingResult.isSuccess());
@@ -370,17 +377,17 @@ public class TestDefaultResourceAwareStrategy {
         final List<String> t1UnfavoredHostIds = Arrays.asList("host-1", "host-2", "host-3");
         t1Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, t1UnfavoredHostIds);
         //generate topologies
-        TopologyDetails topo1 = genTopology("topo-1", t1Conf, 8, 0, 2, 0, currentTime - 2, 10, "user");
+        TopologyDetails topo1 = genTopology("topo-1", t1Conf, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
 
 
         Config t2Conf = new Config();
         t2Conf.putAll(config);
         t2Conf.put(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES, Arrays.asList("host-31", "host-32", "host-33"));
         t2Conf.put(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES, Arrays.asList("host-11", "host-12", "host-13"));
-        TopologyDetails topo2 = genTopology("topo-2", t2Conf, 8, 0, 2, 0, currentTime - 2, 10, "user");
+        TopologyDetails topo2 = genTopology("topo-2", t2Conf, 8, 0, 2, 0, CURRENT_TIME - 2, 10, "user");
 
         Topologies topologies = new Topologies(topo1, topo2);
-        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, config);
 
         List<String> supHostnames = new LinkedList<>();
         for (SupervisorDetails sup : supMap.values()) {
@@ -393,7 +400,7 @@ public class TestDefaultResourceAwareStrategy {
             String rack = entry.getValue();
             List<String> nodesForRack = rackToNodes.get(rack);
             if (nodesForRack == null) {
-                nodesForRack = new ArrayList<String>();
+                nodesForRack = new ArrayList<>();
                 rackToNodes.put(rack, nodesForRack);
             }
             nodesForRack.add(hostName);

http://git-wip-us.apache.org/repos/asf/storm/blob/8aae491b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
index fc44568..f7a2a84 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
@@ -100,7 +100,7 @@ public class TestGenericResourceAwareStrategy {
 
         Topologies topologies = new Topologies(topo);
 
-        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, conf);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, conf);
 
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
 
@@ -188,7 +188,7 @@ public class TestGenericResourceAwareStrategy {
                 genExecsAndComps(stormToplogy), currentTime, "user");
 
         Topologies topologies = new Topologies(topo);
-        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, conf);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, conf);
 
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
 


[3/6] storm git commit: Split up NormalizedResources into a few classes with more narrow responsibilities. Make NormalizedResources a regular class instead of an abstract class. Add some tests. Make calculateAvg/Min throw exceptions if the resource total

Posted by bo...@apache.org.
Split up NormalizedResources into a few classes with more narrow responsibilities. Make NormalizedResources a regular class instead of an abstract class. Add some tests. Make calculateAvg/Min throw exceptions if the resource total is not a superset of the used resources.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/41db23fe
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/41db23fe
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/41db23fe

Branch: refs/heads/master
Commit: 41db23fe86299f71b6ecc7deaa70ab731899c83f
Parents: 8aae491
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Jan 5 21:56:54 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Jan 7 18:27:04 2018 +0100

----------------------------------------------------------------------
 storm-server/pom.xml                            |   2 +-
 .../apache/storm/blobstore/BlobStoreUtils.java  |   2 -
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |   5 +-
 .../supervisor/timer/SupervisorHeartbeat.java   |  16 +-
 .../org/apache/storm/scheduler/Cluster.java     |   5 +-
 .../resource/NormalizedResourceOffer.java       |  89 ++--
 .../resource/NormalizedResourceRequest.java     |  65 ++-
 .../scheduler/resource/NormalizedResources.java | 294 ++++++--------
 .../resource/NormalizedResourcesWithMemory.java |  28 ++
 .../storm/scheduler/resource/RAS_Node.java      |   3 -
 .../storm/scheduler/resource/RAS_Nodes.java     |   1 -
 .../resource/ResourceAwareScheduler.java        |   1 -
 .../resource/ResourceMapArrayBridge.java        |  89 ++++
 .../resource/ResourceNameNormalizer.java        |  65 +++
 .../storm/scheduler/resource/ResourceUtils.java |  14 +-
 .../scheduling/BaseResourceAwareStrategy.java   |   6 +-
 .../DefaultResourceAwareStrategy.java           |   2 -
 .../GenericResourceAwareStrategy.java           |   3 -
 .../org/apache/storm/utils/ServerUtils.java     |  16 +-
 .../resource/TestResourceAwareScheduler.java    |   7 +-
 .../TestUtilsForResourceAwareScheduler.java     |   4 +-
 .../normalization/NormalizedResourcesRule.java  |  50 +++
 .../normalization/NormalizedResourcesTest.java  | 404 +++++++++++++++++++
 .../ResourceMapArrayBridgeTest.java             |  65 +++
 .../scheduling/NormalizedResourcesRule.java     |  50 ---
 .../TestDefaultResourceAwareStrategy.java       |   1 +
 26 files changed, 941 insertions(+), 346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index d6554de..ff917ca 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -130,7 +130,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>2630</maxAllowedViolations>
+                    <maxAllowedViolations>2620</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
index d660ab0..fd1cf40 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -24,9 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import javax.security.auth.Subject;
-
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.Config;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index b495cfa..2c785a5 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -52,7 +52,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.UnaryOperator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import javax.security.auth.Subject;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
@@ -136,7 +135,6 @@ import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
 import org.apache.storm.nimbus.ITopologyValidator;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.SupervisorResources;
 import org.apache.storm.scheduler.DefaultScheduler;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.INimbus;
@@ -144,14 +142,15 @@ import org.apache.storm.scheduler.IScheduler;
 import org.apache.storm.scheduler.SchedulerAssignment;
 import org.apache.storm.scheduler.SchedulerAssignmentImpl;
 import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.SupervisorResources;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
 import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
 import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
 import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
 import org.apache.storm.security.INimbusCredentialPlugin;
 import org.apache.storm.security.auth.AuthUtils;
 import org.apache.storm.security.auth.IAuthorizer;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 87e310e..3faf1e4 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -17,22 +17,20 @@
  */
 package org.apache.storm.daemon.supervisor.timer;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.scheduler.resource.NormalizedResources;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
 public class SupervisorHeartbeat implements Runnable {
 
      private final IStormClusterState stormClusterState;
@@ -92,7 +90,7 @@ public class SupervisorHeartbeat implements Runnable {
             ret.put(stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
         }
 
-        return normalizedResourceMap(ret);
+        return NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(ret);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 9ab0c93..1ed88c7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -37,6 +37,7 @@ import org.apache.storm.networktopography.DNSToSwitchMapping;
 import org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping;
 import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
 import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.NormalizedResources;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;
@@ -44,8 +45,6 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
 public class Cluster implements ISchedulingState {
     private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
     private SchedulerAssignmentImpl assignment;
@@ -440,7 +439,7 @@ public class Cluster implements ISchedulingState {
                     Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, shared.get_on_heap()
             );
         }
-        sharedTotalResources = normalizedResourceMap(sharedTotalResources);
+        sharedTotalResources = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(sharedTotalResources);
         WorkerResources ret = new WorkerResources();
         ret.set_resources(totalResources.toNormalizedMap());
         ret.set_shared_resources(sharedTotalResources);

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
index 58f12d0..787bbce 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
@@ -18,68 +18,97 @@
 
 package org.apache.storm.scheduler.resource;
 
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import org.apache.storm.Constants;
-import org.apache.storm.generated.WorkerResources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An offer of resources that has been normalized.
+ * An offer of resources with normalized resource names.
  */
-public class NormalizedResourceOffer extends NormalizedResources {
+public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
+
     private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceOffer.class);
-    private double totalMemory;
+    private final NormalizedResources normalizedResources;
+    private double totalMemoryMb;
 
     /**
-     * Create a new normalized set of resources.  Note that memory is not covered here because it is not consistent in requests vs offers
-     * because of how on heap vs off heap is used.
+     * Create a new normalized resource offer.
      *
      * @param resources the resources to be normalized.
      */
     public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
-        super(resources, null);
-        totalMemory = getNormalizedResources().getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
+        totalMemoryMb = normalizedResourceMap.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+        this.normalizedResources = new NormalizedResources(normalizedResourceMap);
     }
 
     public NormalizedResourceOffer() {
-        this((Map<String, ? extends Number>)null);
+        this((Map<String, ? extends Number>) null);
     }
 
     public NormalizedResourceOffer(NormalizedResourceOffer other) {
-        super(other);
-        this.totalMemory = other.totalMemory;
+        this.totalMemoryMb = other.totalMemoryMb;
+        this.normalizedResources = new NormalizedResources(other.normalizedResources);
     }
 
     @Override
     public double getTotalMemoryMb() {
-        return totalMemory;
+        return totalMemoryMb;
     }
 
-    @Override
-    public String toString() {
-        return super.toString() + " MEM: " + totalMemory;
+    public Map<String, Double> toNormalizedMap() {
+        Map<String, Double> ret = normalizedResources.toNormalizedMap();
+        ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
+        return ret;
     }
 
-    @Override
-    public Map<String,Double> toNormalizedMap() {
-        Map<String, Double> ret = super.toNormalizedMap();
-        ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemory);
-        return ret;
+    public void add(NormalizedResourcesWithMemory other) {
+        normalizedResources.add(other.getNormalizedResources());
+        totalMemoryMb += other.getTotalMemoryMb();
     }
 
-    @Override
-    public void add(NormalizedResources other) {
-        super.add(other);
-        totalMemory += other.getTotalMemoryMb();
+    public void remove(NormalizedResourcesWithMemory other) {
+        normalizedResources.remove(other.getNormalizedResources());
+        totalMemoryMb -= other.getTotalMemoryMb();
+        if (totalMemoryMb < 0.0) {
+            normalizedResources.throwBecauseResourceBecameNegative(
+                Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
+        };
+    }
+
+    /**
+     * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
+     * double, double).
+     */
+    public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
+        return normalizedResources.calculateAveragePercentageUsedBy(
+            used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+    }
+
+    /**
+     * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+     * double)
+     */
+    public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) {
+        return normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+    }
+
+    /**
+     * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+     * double).
+     */
+    public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
+        return normalizedResources.couldHoldIgnoringSharedMemory(
+            other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
+    }
+
+    public double getTotalCpu() {
+        return normalizedResources.getTotalCpu();
     }
 
     @Override
-    public void remove(NormalizedResources other) {
-        super.remove(other);
-        totalMemory -= other.getTotalMemoryMb();
-        assert totalMemory >= 0.0;
+    public NormalizedResources getNormalizedResources() {
+        return normalizedResources;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
index 5963750..2af90ab 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
@@ -18,12 +18,8 @@
 
 package org.apache.storm.scheduler.resource;
 
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.generated.ComponentCommon;
@@ -36,14 +32,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A request that has been normalized.
+ * A resource request with normalized resource names.
  */
-public class NormalizedResourceRequest extends NormalizedResources {
+public class NormalizedResourceRequest implements NormalizedResourcesWithMemory {
+
     private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
 
     private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) {
         if (!dest.containsKey(destKey)) {
-            Number value = (Number)src.get(srcKey);
+            Number value = (Number) src.get(srcKey);
             if (value != null) {
                 dest.put(destKey, value.doubleValue());
             }
@@ -51,7 +48,7 @@ public class NormalizedResourceRequest extends NormalizedResources {
     }
 
     private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) {
-        Map<String, Double> ret = normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
+        Map<String, Double> ret = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
             Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
         putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
         putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
@@ -96,7 +93,6 @@ public class NormalizedResourceRequest extends NormalizedResources {
                             stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
                     }
 
-
                 }
             }
         } catch (ParseException e) {
@@ -106,48 +102,38 @@ public class NormalizedResourceRequest extends NormalizedResources {
         return topologyResources;
     }
 
+    private final NormalizedResources normalizedResources;
     private double onHeap;
     private double offHeap;
 
-    /**
-     * Create a new normalized set of resources.  Note that memory is not covered here because it is not consistent in requests vs offers
-     * because of how on heap vs off heap is used.
-     *
-     * @param resources the resources to be normalized.
-     * @param topologyConf the config for the topology
-     */
     private NormalizedResourceRequest(Map<String, ? extends Number> resources,
-                                     Map<String, Object> topologyConf) {
-        super(resources, getDefaultResources(topologyConf));
-        initializeMemory(getNormalizedResources());
+        Map<String, Double> defaultResources) {
+        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
+        normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
+        onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+        offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+        normalizedResources = new NormalizedResources(normalizedResourceMap);
     }
 
     public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
-        this(parseResources(component.get_json_conf()), topoConf);
+        this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
     }
 
     public NormalizedResourceRequest(Map<String, Object> topoConf) {
-        this((Map<String, ? extends Number>) null, topoConf);
+        this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
     }
 
     public NormalizedResourceRequest() {
-        super(null, null);
-        initializeMemory(getNormalizedResources());
+        this((Map<String, ? extends Number>) null, null);
     }
 
-    @Override
-    public Map<String,Double> toNormalizedMap() {
-        Map<String, Double> ret = super.toNormalizedMap();
+    public Map<String, Double> toNormalizedMap() {
+        Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
         ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
         ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
         return ret;
     }
 
-    private void initializeMemory(Map<String, Double> normalizedResources) {
-        onHeap = normalizedResources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
-        offHeap = normalizedResources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
-    }
-
     public double getOnHeapMemoryMb() {
         return onHeap;
     }
@@ -166,17 +152,17 @@ public class NormalizedResourceRequest extends NormalizedResources {
 
     /**
      * Add the resources in other to this.
+     *
      * @param other the other Request to add to this.
      */
     public void add(NormalizedResourceRequest other) {
-        super.add(other);
+        this.normalizedResources.add(other.normalizedResources);
         onHeap += other.onHeap;
         offHeap += other.offHeap;
     }
 
-    @Override
     public void add(WorkerResources value) {
-        super.add(value);
+        this.normalizedResources.add(value);
         //The resources are already normalized
         Map<String, Double> resources = value.get_resources();
         onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
@@ -185,11 +171,20 @@ public class NormalizedResourceRequest extends NormalizedResources {
 
     @Override
     public double getTotalMemoryMb() {
-        return getOnHeapMemoryMb() + getOffHeapMemoryMb();
+        return onHeap + offHeap;
     }
 
     @Override
     public String toString() {
         return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
     }
+
+    public double getTotalCpu() {
+        return this.normalizedResources.getTotalCpu();
+    }
+
+    @Override
+    public NormalizedResources getNormalizedResources() {
+        return this.normalizedResources;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
index 846ab69..f8e911a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
@@ -20,74 +20,39 @@ package org.apache.storm.scheduler.resource;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import org.apache.storm.Config;
+import org.apache.commons.lang.Validate;
 import org.apache.storm.Constants;
 import org.apache.storm.generated.WorkerResources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Resources that have been normalized.
+ * Resources that have been normalized. This class is intended as a delegate for more specific types of normalized resource set, since it
+ * does not keep track of memory as a resource.
  */
-public abstract class NormalizedResources {
+public class NormalizedResources {
 
     private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
-    public static final Map<String, String> RESOURCE_NAME_MAPPING;
 
-    static {
-        Map<String, String> tmp = new HashMap<>();
-        tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
-        tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
-        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
-        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
-        tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
-        RESOURCE_NAME_MAPPING = Collections.unmodifiableMap(tmp);
-    }
+    public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
+    private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
 
-    private static double[] makeArray(Map<String, Double> normalizedResources) {
-        //To avoid locking we will go through the map twice.  It should be small so it is probably not a big deal
-        for (String key : normalizedResources.keySet()) {
-            //We are going to skip over CPU and Memory, because they are captured elsewhere
-            if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
-                && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
-                && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
-                && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
-                resourceNames.computeIfAbsent(key, (k) -> counter.getAndIncrement());
-            }
-        }
-        //By default all of the values are 0
-        double[] ret = new double[counter.get()];
-        for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
-            Integer index = resourceNames.get(entry.getKey());
-            if (index != null) {
-                //index == null if it is memory or CPU
-                ret[index] = entry.getValue();
-            }
-        }
-        return ret;
+    static {
+        resetResourceNames();
     }
 
-    private static final ConcurrentMap<String, Integer> resourceNames = new ConcurrentHashMap<>();
-    private static final AtomicInteger counter = new AtomicInteger(0);
     private double cpu;
     private double[] otherResources;
-    private final Map<String, Double> normalizedResources;
 
     /**
-     * This is for testing only. It allows a test to reset the mapping of resource names in the array. We reset the mapping because some
+     * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some
      * algorithms sadly have different behavior if a resource exists or not.
      */
     @VisibleForTesting
     public static void resetResourceNames() {
-        resourceNames.clear();
-        counter.set(0);
+        RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
+        RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
     }
 
     /**
@@ -96,53 +61,21 @@ public abstract class NormalizedResources {
     public NormalizedResources(NormalizedResources other) {
         cpu = other.cpu;
         otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
-        normalizedResources = other.normalizedResources;
     }
 
     /**
-     * Create a new normalized set of resources. Note that memory is not covered here because it is not consistent in requests vs offers
-     * because of how on heap vs off heap is used.
+     * Create a new normalized set of resources. Note that memory is not managed by this class, as it is not consistent in requests vs
+     * offers because of how on heap vs off heap is used.
      *
-     * @param resources the resources to be normalized.
-     * @param defaults the default resources that will also be normalized and combined with the real resources.
+     * @param normalizedResources the normalized resource map
+     * @param getTotalMemoryMb Supplier of total memory in MB.
      */
-    public NormalizedResources(Map<String, ? extends Number> resources, Map<String, ? extends Number> defaults) {
-        normalizedResources = normalizedResourceMap(defaults);
-        normalizedResources.putAll(normalizedResourceMap(resources));
+    public NormalizedResources(Map<String, Double> normalizedResources) {
         cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
-        otherResources = makeArray(normalizedResources);
-    }
-
-    protected final Map<String, Double> getNormalizedResources() {
-        return this.normalizedResources;
+        otherResources = RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(normalizedResources);
     }
 
     /**
-     * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
-     *
-     * @param resourceMap resource map of either Supervisor or Topology
-     * @return the resource map with common resource names
-     */
-    public static Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
-        if (resourceMap == null) {
-            return new HashMap<>();
-        }
-        return new HashMap<>(resourceMap.entrySet().stream()
-            .collect(Collectors.toMap(
-                //Map the key if needed
-                (e) -> RESOURCE_NAME_MAPPING.getOrDefault(e.getKey(), e.getKey()),
-                //Map the value
-                (e) -> e.getValue().doubleValue())));
-    }
-
-    /**
-     * Get the total amount of memory.
-     *
-     * @return the total amount of memory requested or provided.
-     */
-    public abstract double getTotalMemoryMb();
-
-    /**
      * Get the total amount of cpu.
      *
      * @return the amount of cpu.
@@ -150,15 +83,18 @@ public abstract class NormalizedResources {
     public double getTotalCpu() {
         return cpu;
     }
+    
+    private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
+        if (requiredLength > otherResources.length) {
+            double[] newResources = new double[requiredLength];
+            System.arraycopy(otherResources, 0, newResources, 0, otherResources.length);
+            otherResources = newResources;
+        }
+    }
 
     private void add(double[] resourceArray) {
         int otherLength = resourceArray.length;
-        int length = otherResources.length;
-        if (otherLength > length) {
-            double[] newResources = new double[otherLength];
-            System.arraycopy(newResources, 0, otherResources, 0, length);
-            otherResources = newResources;
-        }
+        zeroPadOtherResourcesIfNecessary(otherLength);
         for (int i = 0; i < otherLength; i++) {
             otherResources[i] += resourceArray[i];
         }
@@ -177,45 +113,39 @@ public abstract class NormalizedResources {
     public void add(WorkerResources value) {
         Map<String, Double> workerNormalizedResources = value.get_resources();
         cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
-        add(makeArray(workerNormalizedResources));
+        add(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources));
     }
 
+    public void throwBecauseResourceBecameNegative(String resourceName, double currentValue, double subtractedValue) {
+        throw new IllegalArgumentException(String.format("Resource amounts should never be negative."
+            + " Resource '%s' with current value '%f' became negative because '%f' was removed.",
+            resourceName, currentValue, subtractedValue));
+    }
+    
     /**
      * Remove the other resources from this. This is the same as subtracting the resources in other from this.
      *
      * @param other the resources we want removed.
+     * @throws IllegalArgumentException if subtracting other from this would result in any resource amount becoming negative.
      */
     public void remove(NormalizedResources other) {
         this.cpu -= other.cpu;
-        assert cpu >= 0.0;
-        int otherLength = other.otherResources.length;
-        int length = otherResources.length;
-        if (otherLength > length) {
-            double[] newResources = new double[otherLength];
-            System.arraycopy(newResources, 0, otherResources, 0, length);
-            otherResources = newResources;
+        if (cpu < 0.0) {
+            throwBecauseResourceBecameNegative(Constants.COMMON_CPU_RESOURCE_NAME, cpu, other.cpu);
         }
+        int otherLength = other.otherResources.length;
+        zeroPadOtherResourcesIfNecessary(otherLength);
         for (int i = 0; i < otherLength; i++) {
             otherResources[i] -= other.otherResources[i];
-            assert otherResources[i] >= 0.0;
+            if (otherResources[i] < 0.0) {
+                throwBecauseResourceBecameNegative(getResourceNameForResourceIndex(i), otherResources[i], other.otherResources[i]);
+            }
         }
     }
 
     @Override
     public String toString() {
-        return "CPU: " + cpu + " Other resources: " + toNormalizedOtherResources();
-    }
-
-    private Map<String, Double> toNormalizedOtherResources() {
-        Map<String, Double> ret = new HashMap<>();
-        int length = otherResources.length;
-        for (Map.Entry<String, Integer> entry : resourceNames.entrySet()) {
-            int index = entry.getValue();
-            if (index < length) {
-                ret.put(entry.getKey(), otherResources[index]);
-            }
-        }
-        return ret;
+        return "Normalized resources: " + toNormalizedMap();
     }
 
     /**
@@ -223,7 +153,7 @@ public abstract class NormalizedResources {
      * user.
      */
     public Map<String, Double> toNormalizedMap() {
-        Map<String, Double> ret = toNormalizedOtherResources();
+        Map<String, Double> ret = RESOURCE_MAP_ARRAY_BRIDGE.translateFromResourceArray(otherResources);
         ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
         return ret;
     }
@@ -240,9 +170,11 @@ public abstract class NormalizedResources {
      * does not check memory because with shared memory it is beyond the scope of this.
      *
      * @param other the resources that we want to check if they would fit in this.
+     * @param thisTotalMemoryMb The total memory in MB of this
+     * @param otherTotalMemoryMb The total memory in MB of other
      * @return true if it might fit, else false if it could not possibly fit.
      */
-    public boolean couldHoldIgnoringSharedMemory(NormalizedResources other) {
+    public boolean couldHoldIgnoringSharedMemory(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
         if (this.cpu < other.getTotalCpu()) {
             return false;
         }
@@ -253,69 +185,70 @@ public abstract class NormalizedResources {
             }
         }
 
-        if (this.getTotalMemoryMb() < other.getTotalMemoryMb()) {
-            return false;
-        }
-        return true;
+        return thisTotalMemoryMb >= otherTotalMemoryMb;
     }
 
-    private void throwBecauseResourceIsMissingFromTotal(int resourceIndex) {
-        String resourceName = null;
-        for (Map.Entry<String, Integer> entry : resourceNames.entrySet()) {
+    private String getResourceNameForResourceIndex(int resourceIndex) {
+        for (Map.Entry<String, Integer> entry : RESOURCE_MAP_ARRAY_BRIDGE.getResourceNamesToArrayIndex().entrySet()) {
             int index = entry.getValue();
             if (index == resourceIndex) {
-                resourceName = entry.getKey();
-                break;
+                return entry.getKey();
             }
         }
-        if (resourceName == null) {
-            throw new IllegalStateException("Array index " + resourceIndex + " is not mapped in the resource names map."
-                + " This should not be possible, and is likely a bug in the Storm code.");
-        }
-        throw new IllegalArgumentException("Total resources does not contain resource '"
-            + resourceName
-            + "'. All resources should be represented in the total. This is likely a bug in the Storm code");
+        return null;
+    }
+
+    private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+        throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
+            + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
+            used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
     }
 
     /**
-     * Calculate the average resource usage percentage with this being the total resources and used being the amounts used.
+     * Calculate the average resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+     * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+     * division by 0. If all resources are skipped the result is defined to be 100.0.
      *
      * @param used the amount of resources used.
-     * @return the average percentage used 0.0 to 100.0. Clamps to 100.0 in case there are no available resources in the total
+     * @param totalMemoryMb The total memory in MB
+     * @param usedMemoryMb The used memory in MB
+     * @return the average percentage used 0.0 to 100.0.
+     * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+     *     resources that are not present in the total.
      */
-    public double calculateAveragePercentageUsedBy(NormalizedResources used) {
+    public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
+                + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+                toNormalizedMap(), used.toNormalizedMap());
+        }
+
         int skippedResourceTypes = 0;
         double total = 0.0;
-        double totalMemory = getTotalMemoryMb();
-        if (totalMemory != 0.0) {
-            total += used.getTotalMemoryMb() / totalMemory;
+        if (usedMemoryMb > totalMemoryMb) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        if (totalMemoryMb != 0.0) {
+            total += usedMemoryMb / totalMemoryMb;
         } else {
             skippedResourceTypes++;
         }
         double totalCpu = getTotalCpu();
+        if (used.getTotalCpu() > getTotalCpu()) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
         if (totalCpu != 0.0) {
             total += used.getTotalCpu() / getTotalCpu();
         } else {
             skippedResourceTypes++;
         }
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Calculating avg percentage used by. Used CPU: {} Total CPU: {} Used Mem: {} Total Mem: {}"
-                + " Other Used: {} Other Total: {}", totalCpu, used.getTotalCpu(), totalMemory, used.getTotalMemoryMb(),
-                this.toNormalizedOtherResources(), used.toNormalizedOtherResources());
-        }
 
         if (used.otherResources.length > otherResources.length) {
-            throwBecauseResourceIsMissingFromTotal(used.otherResources.length);
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
         }
 
         for (int i = 0; i < otherResources.length; i++) {
             double totalValue = otherResources[i];
-            if (totalValue == 0.0) {
-                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
-                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
-                skippedResourceTypes++;
-                continue;
-            }
             double usedValue;
             if (i >= used.otherResources.length) {
                 //Resources missing from used are using none of that resource
@@ -323,14 +256,24 @@ public abstract class NormalizedResources {
             } else {
                 usedValue = used.otherResources[i];
             }
+            if (usedValue > totalValue) {
+                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+            }
+            if (totalValue == 0.0) {
+                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+                skippedResourceTypes++;
+                continue;
+            }
+
             total += usedValue / totalValue;
         }
         //Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
         int divisor = 2 + otherResources.length - skippedResourceTypes;
         if (divisor == 0) {
-            /*This is an arbitrary choice to make the result consistent with calculateMin.
-             Any value would be valid here, becase there are no (non-zero) resources in the total set of resources,
-             so we're trying to average 0 values.
+            /*
+             * This is an arbitrary choice to make the result consistent with calculateMin. Any value would be valid here, becase there are
+             * no (non-zero) resources in the total set of resources, so we're trying to average 0 values.
              */
             return 100.0;
         } else {
@@ -339,41 +282,43 @@ public abstract class NormalizedResources {
     }
 
     /**
-     * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used.
+     * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+     * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+     * division by 0. If all resources are skipped the result is defined to be 100.0.
      *
      * @param used the amount of resources used.
-     * @return the minimum percentage used 0.0 to 100.0. Clamps to 100.0 in case there are no available resources in the total.
+     * @param totalMemoryMb The total memory in MB
+     * @param usedMemoryMb The used memory in MB
+     * @return the minimum percentage used 0.0 to 100.0.
+     * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+     *     resources that are not present in the total.
      */
-    public double calculateMinPercentageUsedBy(NormalizedResources used) {
-        double totalMemory = getTotalMemoryMb();
-        double totalCpu = getTotalCpu();
-
+    public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Calculating min percentage used by. Used CPU: {} Total CPU: {} Used Mem: {} Total Mem: {}"
-                + " Other Used: {} Other Total: {}", totalCpu, used.getTotalCpu(), totalMemory, used.getTotalMemoryMb(),
-                toNormalizedOtherResources(), used.toNormalizedOtherResources());
+            LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}"
+                + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+                toNormalizedMap(), used.toNormalizedMap());
         }
 
-        if (used.otherResources.length > otherResources.length) {
-            throwBecauseResourceIsMissingFromTotal(used.otherResources.length);
+        double min = 1.0;
+        if (usedMemoryMb > totalMemoryMb) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
         }
-
-        if (used.otherResources.length != otherResources.length
-            || totalMemory == 0.0
-            || totalCpu == 0.0) {
-            //If the lengths don't match one of the resources will be 0, which means we would calculate the percentage to be 0.0
-            // and so the min would be 0.0 (assuming that we can never go negative on a resource being used.
-            return 0.0;
+        if (totalMemoryMb != 0.0) {
+            min = Math.min(min, usedMemoryMb / totalMemoryMb);
         }
-
-        double min = 100.0;
-        if (totalMemory != 0.0) {
-            min = Math.min(min, used.getTotalMemoryMb() / totalMemory);
+        double totalCpu = getTotalCpu();
+        if (used.getTotalCpu() > totalCpu) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
         }
         if (totalCpu != 0.0) {
             min = Math.min(min, used.getTotalCpu() / totalCpu);
         }
 
+        if (used.otherResources.length > otherResources.length) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        
         for (int i = 0; i < otherResources.length; i++) {
             if (otherResources[i] == 0.0) {
                 //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
@@ -384,6 +329,9 @@ public abstract class NormalizedResources {
                 //Resources missing from used are using none of that resource
                 return 0;
             }
+            if (used.otherResources[i] > otherResources[i]) {
+                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+            }
             min = Math.min(min, used.otherResources[i] / otherResources[i]);
         }
         return min * 100.0;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
new file mode 100644
index 0000000..640233a
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+/**
+ * Intended for {@link NormalizedResources} wrappers that handle memory.
+ */
+public interface NormalizedResourcesWithMemory {
+
+    NormalizedResources getNormalizedResources();
+
+    double getTotalMemoryMb();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 5350005..db9ca73 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -25,14 +25,11 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-
-import org.apache.storm.Constants;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
-import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
index 6c70ff1..2bd2b86 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
-
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.SchedulerAssignment;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index dc557ff..e730705 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -30,7 +30,6 @@ import org.apache.storm.DaemonConfig;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.IScheduler;
 import org.apache.storm.scheduler.SchedulerAssignment;
-import org.apache.storm.scheduler.SchedulerAssignmentImpl;
 import org.apache.storm.scheduler.SingleTopologyCluster;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
new file mode 100644
index 0000000..cf4f80b
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.storm.Constants;
+
+/**
+ * Provides translation between normalized resource maps and resource value arrays. Some operations use resource value arrays instead of the
+ * full normalized resource map as an optimization. See {@link NormalizedResources}.
+ */
+public class ResourceMapArrayBridge {
+
+    private final ConcurrentMap<String, Integer> resourceNamesToArrayIndex = new ConcurrentHashMap<>();
+    private final AtomicInteger counter = new AtomicInteger(0);
+
+    /**
+     * Translates a normalized resource map to an array of resource values. Each resource name will be assigned an index in the array, which
+     * is guaranteed to be consistent with subsequent invocations of this method. Note that CPU and memory resources are not translated by
+     * this method, as they are expected to be captured elsewhere.
+     *
+     * @param normalizedResources The resources to translate to an array
+     * @return The array of resource values
+     */
+    public double[] translateToResourceArray(Map<String, Double> normalizedResources) {
+        //To avoid locking we will go through the map twice.  It should be small so it is probably not a big deal
+        for (String key : normalizedResources.keySet()) {
+            //We are going to skip over CPU and Memory, because they are captured elsewhere
+            if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
+                resourceNamesToArrayIndex.computeIfAbsent(key, (k) -> counter.getAndIncrement());
+            }
+        }
+        //By default all of the values are 0
+        double[] ret = new double[counter.get()];
+        for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
+            Integer index = resourceNamesToArrayIndex.get(entry.getKey());
+            if (index != null) {
+                //index == null if it is memory or CPU
+                ret[index] = entry.getValue();
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Translates an array of resource values to a normalized resource map.
+     *
+     * @param resources The resource array to translate
+     * @return The normalized resource map
+     */
+    public Map<String, Double> translateFromResourceArray(double[] resources) {
+        Map<String, Double> ret = new HashMap<>();
+        int length = resources.length;
+        for (Map.Entry<String, Integer> entry : resourceNamesToArrayIndex.entrySet()) {
+            int index = entry.getValue();
+            if (index < length) {
+                ret.put(entry.getKey(), resources[index]);
+            }
+        }
+        return ret;
+    }
+
+    public Map<String, Integer> getResourceNamesToArrayIndex() {
+        return Collections.unmodifiableMap(resourceNamesToArrayIndex);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
new file mode 100644
index 0000000..d59ba92
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+
+/**
+ * Provides resource name normalization for resource maps.
+ */
+public class ResourceNameNormalizer {
+
+    private final Map<String, String> resourceNameMapping;
+
+    public ResourceNameNormalizer() {
+        Map<String, String> tmp = new HashMap<>();
+        tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
+        tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
+        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
+        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
+        tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
+        resourceNameMapping = Collections.unmodifiableMap(tmp);
+    }
+
+    /**
+     * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
+     *
+     * @param resourceMap resource map of either Supervisor or Topology
+     * @return the resource map with common resource names
+     */
+    public Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
+        if (resourceMap == null) {
+            return new HashMap<>();
+        }
+        return new HashMap<>(resourceMap.entrySet().stream()
+            .collect(Collectors.toMap(
+                //Map the key if needed
+                (e) -> resourceNameMapping.getOrDefault(e.getKey(), e.getKey()),
+                //Map the value
+                (e) -> e.getValue().doubleValue())));
+    }
+
+    public Map<String, String> getResourceNameMapping() {
+        return resourceNameMapping;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index db9dbfa..ca4ea63 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -20,9 +20,7 @@ package org.apache.storm.scheduler.resource;
 
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.storm.Config;
-import org.apache.storm.Constants;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.SpoutSpec;
@@ -33,8 +31,6 @@ import org.json.simple.parser.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
 public class ResourceUtils {
     private static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class);
 
@@ -95,7 +91,8 @@ public class ResourceUtils {
 
                 if (resourceUpdatesMap.containsKey(spoutName)) {
                     ComponentCommon spoutCommon = spoutSpec.get_common();
-                    Map<String, Double> resourcesUpdate = normalizedResourceMap(resourceUpdatesMap.get(spoutName));
+                    Map<String, Double> resourcesUpdate = NormalizedResources
+                        .RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceUpdatesMap.get(spoutName));
                     String newJsonConf = getJsonWithUpdatedResources(spoutCommon.get_json_conf(), resourcesUpdate);
                     spoutCommon.set_json_conf(newJsonConf);
                     componentsUpdated.put(spoutName, resourcesUpdate);
@@ -110,7 +107,8 @@ public class ResourceUtils {
 
                 if (resourceUpdatesMap.containsKey(boltName)) {
                     ComponentCommon boltCommon = boltObj.get_common();
-                    Map<String, Double> resourcesUpdate = normalizedResourceMap(resourceUpdatesMap.get(boltName));
+                    Map<String, Double> resourcesUpdate = NormalizedResources
+                        .RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceUpdatesMap.get(boltName));
                     String newJsonConf = getJsonWithUpdatedResources(boltCommon.get_json_conf(), resourceUpdatesMap.get(boltName));
                     boltCommon.set_json_conf(newJsonConf);
                     componentsUpdated.put(boltName, resourcesUpdate);
@@ -128,7 +126,7 @@ public class ResourceUtils {
     }
 
     public static String getCorrespondingLegacyResourceName(String normalizedResourceName) {
-        for(Map.Entry<String, String> entry : NormalizedResources.RESOURCE_NAME_MAPPING.entrySet()) {
+        for(Map.Entry<String, String> entry : NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().entrySet()) {
             if (entry.getValue().equals(normalizedResourceName)) {
                 return entry.getKey();
             }
@@ -148,7 +146,7 @@ public class ResourceUtils {
                     );
 
             for (Map.Entry<String, Double> resourceUpdateEntry : resourceUpdates.entrySet()) {
-                if (NormalizedResources.RESOURCE_NAME_MAPPING.containsValue(resourceUpdateEntry.getKey())) {
+                if (NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().containsValue(resourceUpdateEntry.getKey())) {
                     // if there will be legacy values they will be in the outer conf
                     jsonObject.remove(getCorrespondingLegacyResourceName(resourceUpdateEntry.getKey()));
                     componentResourceMap.remove(getCorrespondingLegacyResourceName(resourceUpdateEntry.getKey()));

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 67bc867..73e1aa0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -19,7 +19,6 @@
 package org.apache.storm.scheduler.resource.strategies.scheduling;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -31,18 +30,15 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
-
-import org.apache.storm.executor.Executor;
 import org.apache.storm.generated.ComponentType;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.Component;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
 import org.apache.storm.scheduler.resource.RAS_Node;
 import org.apache.storm.scheduler.resource.RAS_Nodes;
-import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index e0dc881..efa3ecf 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -23,13 +23,11 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.TreeSet;
-
 import org.apache.storm.Config;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.Component;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
-
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import org.apache.storm.scheduler.resource.SchedulingStatus;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index 108dc49..2b0ca40 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -21,16 +21,13 @@ package org.apache.storm.scheduler.resource.strategies.scheduling;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.TreeSet;
 import org.apache.storm.Config;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.Component;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import org.apache.storm.scheduler.resource.SchedulingStatus;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index a64c9b4..6f33251 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -25,28 +25,27 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileWriter;
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.RandomAccessFile;
-import java.nio.file.Files;
 import java.nio.file.FileSystems;
+import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.jar.JarEntry;
-import java.util.jar.JarFile;
 import java.util.List;
 import java.util.Map;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 import javax.security.auth.Subject;
-
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.exec.CommandLine;
@@ -54,16 +53,15 @@ import org.apache.commons.exec.DefaultExecutor;
 import org.apache.commons.exec.ExecuteException;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.BlobStoreAclHandler;
 import org.apache.storm.blobstore.ClientBlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.blobstore.LocalFsBlobStore;
 import org.apache.storm.blobstore.LocalModeClientBlobStore;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
 import org.apache.storm.daemon.StormCommon;
-import org.apache.storm.DaemonConfig;
 import org.apache.storm.generated.AccessControl;
 import org.apache.storm.generated.AccessControlType;
 import org.apache.storm.generated.AuthorizationException;
@@ -73,8 +71,8 @@ import org.apache.storm.generated.ReadableBlobMeta;
 import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
 import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
 import org.apache.storm.security.auth.SingleUserPrincipal;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index eb5f70f..360db3a 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -501,9 +501,6 @@ public class TestResourceAwareScheduler {
     }
 
     public void testHeterogeneousCluster(Config topologyConf, String strategyName) {
-        Map<String, Double> test = new HashMap<>();
-        test.put("gpu.count", 0.0);
-        new NormalizedResourceOffer(test);
         LOG.info("\n\n\t\ttestHeterogeneousCluster");
         INimbus iNimbus = new INimbusTest();
         Map<String, Double> resourceMap1 = new HashMap<>(); // strong supervisor node
@@ -513,8 +510,8 @@ public class TestResourceAwareScheduler {
         resourceMap2.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
         resourceMap2.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0);
 
-        resourceMap1 = NormalizedResources.normalizedResourceMap(resourceMap1);
-        resourceMap2 = NormalizedResources.normalizedResourceMap(resourceMap2);
+        resourceMap1 = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap1);
+        resourceMap2 = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap2);
 
         Map<String, SupervisorDetails> supMap = new HashMap<>();
         for (int i = 0; i < 2; i++) {

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 4a2e644..3d37ad9 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -65,8 +65,6 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.apache.storm.scheduler.resource.NormalizedResources.normalizedResourceMap;
-
 public class TestUtilsForResourceAwareScheduler {
     private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
 
@@ -155,7 +153,7 @@ public class TestUtilsForResourceAwareScheduler {
             for (int j = 0; j < numPorts; j++) {
                 ports.add(j);
             }
-            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, normalizedResourceMap(resourceMap));
+            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
             retList.put(sup.getId(), sup);
         }
         return retList;

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
new file mode 100644
index 0000000..1f78f53
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import org.apache.storm.scheduler.resource.NormalizedResources;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+public class NormalizedResourcesRule implements TestRule {
+
+    public static class NRStatement extends Statement {
+        private final Statement base;
+
+        public NRStatement(Statement base) {
+            this.base = base;
+        }
+
+        @Override
+        public void evaluate() throws Throwable {
+            NormalizedResources.resetResourceNames();
+            try {
+                base.evaluate();
+            } finally {
+                NormalizedResources.resetResourceNames();
+            }
+        }
+    }
+
+    @Override
+    public Statement apply(Statement statement, Description description) {
+        return new NRStatement(statement);
+    }
+}


[5/6] storm git commit: Move some resource normalization classes to a new package since the resource package was getting crowded

Posted by bo...@apache.org.
Move some resource normalization classes to a new package since the resource package was getting crowded


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88533346
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88533346
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88533346

Branch: refs/heads/master
Commit: 88533346dbfb318d8bf623703f6a16a1eab19534
Parents: 41db23f
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sun Jan 7 18:12:55 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Jan 7 18:43:11 2018 +0100

----------------------------------------------------------------------
 storm-server/pom.xml                            |   2 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |   2 +-
 .../supervisor/timer/SupervisorHeartbeat.java   |   2 +-
 .../org/apache/storm/scheduler/Cluster.java     |   6 +-
 .../storm/scheduler/ISchedulingState.java       |   5 +-
 .../storm/scheduler/SupervisorDetails.java      |   2 +-
 .../apache/storm/scheduler/TopologyDetails.java |   2 +-
 .../resource/NormalizedResourceOffer.java       | 114 ------
 .../resource/NormalizedResourceRequest.java     | 190 ----------
 .../scheduler/resource/NormalizedResources.java | 339 ------------------
 .../resource/NormalizedResourcesWithMemory.java |  28 --
 .../storm/scheduler/resource/RAS_Node.java      |   1 +
 .../resource/ResourceMapArrayBridge.java        |  89 -----
 .../resource/ResourceNameNormalizer.java        |  65 ----
 .../storm/scheduler/resource/ResourceUtils.java |   2 +
 .../normalization/NormalizedResourceOffer.java  | 114 ++++++
 .../NormalizedResourceRequest.java              | 190 ++++++++++
 .../normalization/NormalizedResources.java      | 343 +++++++++++++++++++
 .../NormalizedResourcesWithMemory.java          |  28 ++
 .../normalization/ResourceMapArrayBridge.java   |  89 +++++
 .../normalization/ResourceNameNormalizer.java   |  68 ++++
 .../scheduling/BaseResourceAwareStrategy.java   |   2 +-
 .../org/apache/storm/utils/ServerUtils.java     |   2 +-
 .../resource/TestResourceAwareScheduler.java    |   1 +
 .../TestUtilsForResourceAwareScheduler.java     |   1 +
 .../normalization/NormalizedResourcesRule.java  |   1 -
 .../normalization/NormalizedResourcesTest.java  |   1 -
 .../ResourceMapArrayBridgeTest.java             |   2 -
 28 files changed, 849 insertions(+), 842 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index ff917ca..7c301a4 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -130,7 +130,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>2620</maxAllowedViolations>
+                    <maxAllowedViolations>2617</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 2c785a5..e66dbe5 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -150,7 +150,7 @@ import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
 import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
 import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
 import org.apache.storm.scheduler.resource.ResourceUtils;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 import org.apache.storm.security.INimbusCredentialPlugin;
 import org.apache.storm.security.auth.AuthUtils;
 import org.apache.storm.security.auth.IAuthorizer;

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 3faf1e4..2be241a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -27,7 +27,7 @@ import org.apache.storm.DaemonConfig;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.generated.SupervisorInfo;
-import org.apache.storm.scheduler.resource.NormalizedResources;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 1ed88c7..625fe6f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -35,9 +35,9 @@ import org.apache.storm.generated.SharedMemory;
 import org.apache.storm.generated.WorkerResources;
 import org.apache.storm.networktopography.DNSToSwitchMapping;
 import org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
-import org.apache.storm.scheduler.resource.NormalizedResources;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index 7175b7e..187a03c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -22,11 +22,10 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.storm.daemon.nimbus.TopologyResources;
 import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 
 /** An interface that provides access to the current scheduling state. */
 public interface ISchedulingState {

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
index 3d08715..242b54c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
@@ -23,7 +23,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.storm.Constants;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index 8e4c1d5..41e7edf 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -35,7 +35,7 @@ import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.generated.SharedMemory;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
deleted file mode 100644
index 787bbce..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.Map;
-import org.apache.storm.Constants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An offer of resources with normalized resource names.
- */
-public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceOffer.class);
-    private final NormalizedResources normalizedResources;
-    private double totalMemoryMb;
-
-    /**
-     * Create a new normalized resource offer.
-     *
-     * @param resources the resources to be normalized.
-     */
-    public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
-        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
-        totalMemoryMb = normalizedResourceMap.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
-        this.normalizedResources = new NormalizedResources(normalizedResourceMap);
-    }
-
-    public NormalizedResourceOffer() {
-        this((Map<String, ? extends Number>) null);
-    }
-
-    public NormalizedResourceOffer(NormalizedResourceOffer other) {
-        this.totalMemoryMb = other.totalMemoryMb;
-        this.normalizedResources = new NormalizedResources(other.normalizedResources);
-    }
-
-    @Override
-    public double getTotalMemoryMb() {
-        return totalMemoryMb;
-    }
-
-    public Map<String, Double> toNormalizedMap() {
-        Map<String, Double> ret = normalizedResources.toNormalizedMap();
-        ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
-        return ret;
-    }
-
-    public void add(NormalizedResourcesWithMemory other) {
-        normalizedResources.add(other.getNormalizedResources());
-        totalMemoryMb += other.getTotalMemoryMb();
-    }
-
-    public void remove(NormalizedResourcesWithMemory other) {
-        normalizedResources.remove(other.getNormalizedResources());
-        totalMemoryMb -= other.getTotalMemoryMb();
-        if (totalMemoryMb < 0.0) {
-            normalizedResources.throwBecauseResourceBecameNegative(
-                Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
-        };
-    }
-
-    /**
-     * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
-     * double, double).
-     */
-    public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
-        return normalizedResources.calculateAveragePercentageUsedBy(
-            used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
-    }
-
-    /**
-     * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
-     * double)
-     */
-    public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) {
-        return normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
-    }
-
-    /**
-     * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
-     * double).
-     */
-    public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
-        return normalizedResources.couldHoldIgnoringSharedMemory(
-            other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
-    }
-
-    public double getTotalCpu() {
-        return normalizedResources.getTotalCpu();
-    }
-
-    @Override
-    public NormalizedResources getNormalizedResources() {
-        return normalizedResources;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
deleted file mode 100644
index 2af90ab..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
-import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.utils.ObjectReader;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A resource request with normalized resource names.
- */
-public class NormalizedResourceRequest implements NormalizedResourcesWithMemory {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
-
-    private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) {
-        if (!dest.containsKey(destKey)) {
-            Number value = (Number) src.get(srcKey);
-            if (value != null) {
-                dest.put(destKey, value.doubleValue());
-            }
-        }
-    }
-
-    private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) {
-        Map<String, Double> ret = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
-            Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
-        putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
-        putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
-        putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
-        return ret;
-    }
-
-    private static Map<String, Double> parseResources(String input) {
-        Map<String, Double> topologyResources = new HashMap<>();
-        JSONParser parser = new JSONParser();
-        LOG.debug("Input to parseResources {}", input);
-        try {
-            if (input != null) {
-                Object obj = parser.parse(input);
-                JSONObject jsonObject = (JSONObject) obj;
-
-                // Legacy resource parsing
-                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
-                    Double topoMemOnHeap = ObjectReader
-                        .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
-                    topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
-                }
-                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
-                    Double topoMemOffHeap = ObjectReader
-                        .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
-                    topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
-                }
-                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
-                    Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
-                        null);
-                    topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
-                }
-
-                // If resource is also present in resources map will overwrite the above
-                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
-                    Map<String, Number> rawResourcesMap =
-                        (Map<String, Number>) jsonObject.computeIfAbsent(
-                            Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
-
-                    for (Map.Entry<String, Number> stringNumberEntry : rawResourcesMap.entrySet()) {
-                        topologyResources.put(
-                            stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
-                    }
-
-                }
-            }
-        } catch (ParseException e) {
-            LOG.error("Failed to parse component resources is:" + e.toString(), e);
-            return null;
-        }
-        return topologyResources;
-    }
-
-    private final NormalizedResources normalizedResources;
-    private double onHeap;
-    private double offHeap;
-
-    private NormalizedResourceRequest(Map<String, ? extends Number> resources,
-        Map<String, Double> defaultResources) {
-        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
-        normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
-        onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
-        offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
-        normalizedResources = new NormalizedResources(normalizedResourceMap);
-    }
-
-    public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
-        this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
-    }
-
-    public NormalizedResourceRequest(Map<String, Object> topoConf) {
-        this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
-    }
-
-    public NormalizedResourceRequest() {
-        this((Map<String, ? extends Number>) null, null);
-    }
-
-    public Map<String, Double> toNormalizedMap() {
-        Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
-        ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
-        ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
-        return ret;
-    }
-
-    public double getOnHeapMemoryMb() {
-        return onHeap;
-    }
-
-    public void addOnHeap(final double onHeap) {
-        this.onHeap += onHeap;
-    }
-
-    public double getOffHeapMemoryMb() {
-        return offHeap;
-    }
-
-    public void addOffHeap(final double offHeap) {
-        this.offHeap += offHeap;
-    }
-
-    /**
-     * Add the resources in other to this.
-     *
-     * @param other the other Request to add to this.
-     */
-    public void add(NormalizedResourceRequest other) {
-        this.normalizedResources.add(other.normalizedResources);
-        onHeap += other.onHeap;
-        offHeap += other.offHeap;
-    }
-
-    public void add(WorkerResources value) {
-        this.normalizedResources.add(value);
-        //The resources are already normalized
-        Map<String, Double> resources = value.get_resources();
-        onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
-        offHeap += resources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
-    }
-
-    @Override
-    public double getTotalMemoryMb() {
-        return onHeap + offHeap;
-    }
-
-    @Override
-    public String toString() {
-        return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
-    }
-
-    public double getTotalCpu() {
-        return this.normalizedResources.getTotalCpu();
-    }
-
-    @Override
-    public NormalizedResources getNormalizedResources() {
-        return this.normalizedResources;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
deleted file mode 100644
index f8e911a..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
-import java.util.Map;
-import org.apache.commons.lang.Validate;
-import org.apache.storm.Constants;
-import org.apache.storm.generated.WorkerResources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Resources that have been normalized. This class is intended as a delegate for more specific types of normalized resource set, since it
- * does not keep track of memory as a resource.
- */
-public class NormalizedResources {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
-
-    public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
-    private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
-
-    static {
-        resetResourceNames();
-    }
-
-    private double cpu;
-    private double[] otherResources;
-
-    /**
-     * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some
-     * algorithms sadly have different behavior if a resource exists or not.
-     */
-    @VisibleForTesting
-    public static void resetResourceNames() {
-        RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
-        RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
-    }
-
-    /**
-     * Copy constructor.
-     */
-    public NormalizedResources(NormalizedResources other) {
-        cpu = other.cpu;
-        otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
-    }
-
-    /**
-     * Create a new normalized set of resources. Note that memory is not managed by this class, as it is not consistent in requests vs
-     * offers because of how on heap vs off heap is used.
-     *
-     * @param normalizedResources the normalized resource map
-     * @param getTotalMemoryMb Supplier of total memory in MB.
-     */
-    public NormalizedResources(Map<String, Double> normalizedResources) {
-        cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
-        otherResources = RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(normalizedResources);
-    }
-
-    /**
-     * Get the total amount of cpu.
-     *
-     * @return the amount of cpu.
-     */
-    public double getTotalCpu() {
-        return cpu;
-    }
-    
-    private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
-        if (requiredLength > otherResources.length) {
-            double[] newResources = new double[requiredLength];
-            System.arraycopy(otherResources, 0, newResources, 0, otherResources.length);
-            otherResources = newResources;
-        }
-    }
-
-    private void add(double[] resourceArray) {
-        int otherLength = resourceArray.length;
-        zeroPadOtherResourcesIfNecessary(otherLength);
-        for (int i = 0; i < otherLength; i++) {
-            otherResources[i] += resourceArray[i];
-        }
-    }
-
-    public void add(NormalizedResources other) {
-        this.cpu += other.cpu;
-        add(other.otherResources);
-    }
-
-    /**
-     * Add the resources from a worker to this.
-     *
-     * @param value the worker resources that should be added to this.
-     */
-    public void add(WorkerResources value) {
-        Map<String, Double> workerNormalizedResources = value.get_resources();
-        cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
-        add(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources));
-    }
-
-    public void throwBecauseResourceBecameNegative(String resourceName, double currentValue, double subtractedValue) {
-        throw new IllegalArgumentException(String.format("Resource amounts should never be negative."
-            + " Resource '%s' with current value '%f' became negative because '%f' was removed.",
-            resourceName, currentValue, subtractedValue));
-    }
-    
-    /**
-     * Remove the other resources from this. This is the same as subtracting the resources in other from this.
-     *
-     * @param other the resources we want removed.
-     * @throws IllegalArgumentException if subtracting other from this would result in any resource amount becoming negative.
-     */
-    public void remove(NormalizedResources other) {
-        this.cpu -= other.cpu;
-        if (cpu < 0.0) {
-            throwBecauseResourceBecameNegative(Constants.COMMON_CPU_RESOURCE_NAME, cpu, other.cpu);
-        }
-        int otherLength = other.otherResources.length;
-        zeroPadOtherResourcesIfNecessary(otherLength);
-        for (int i = 0; i < otherLength; i++) {
-            otherResources[i] -= other.otherResources[i];
-            if (otherResources[i] < 0.0) {
-                throwBecauseResourceBecameNegative(getResourceNameForResourceIndex(i), otherResources[i], other.otherResources[i]);
-            }
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "Normalized resources: " + toNormalizedMap();
-    }
-
-    /**
-     * Return a Map of the normalized resource name to a double. This should only be used when returning thrift resource requests to the end
-     * user.
-     */
-    public Map<String, Double> toNormalizedMap() {
-        Map<String, Double> ret = RESOURCE_MAP_ARRAY_BRIDGE.translateFromResourceArray(otherResources);
-        ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
-        return ret;
-    }
-
-    private double getResourceAt(int index) {
-        if (index >= otherResources.length) {
-            return 0.0;
-        }
-        return otherResources[index];
-    }
-
-    /**
-     * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
-     * does not check memory because with shared memory it is beyond the scope of this.
-     *
-     * @param other the resources that we want to check if they would fit in this.
-     * @param thisTotalMemoryMb The total memory in MB of this
-     * @param otherTotalMemoryMb The total memory in MB of other
-     * @return true if it might fit, else false if it could not possibly fit.
-     */
-    public boolean couldHoldIgnoringSharedMemory(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
-        if (this.cpu < other.getTotalCpu()) {
-            return false;
-        }
-        int length = Math.max(this.otherResources.length, other.otherResources.length);
-        for (int i = 0; i < length; i++) {
-            if (getResourceAt(i) < other.getResourceAt(i)) {
-                return false;
-            }
-        }
-
-        return thisTotalMemoryMb >= otherTotalMemoryMb;
-    }
-
-    private String getResourceNameForResourceIndex(int resourceIndex) {
-        for (Map.Entry<String, Integer> entry : RESOURCE_MAP_ARRAY_BRIDGE.getResourceNamesToArrayIndex().entrySet()) {
-            int index = entry.getValue();
-            if (index == resourceIndex) {
-                return entry.getKey();
-            }
-        }
-        return null;
-    }
-
-    private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
-        throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
-            + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
-            used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
-    }
-
-    /**
-     * Calculate the average resource usage percentage with this being the total resources and used being the amounts used. Used must be a
-     * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
-     * division by 0. If all resources are skipped the result is defined to be 100.0.
-     *
-     * @param used the amount of resources used.
-     * @param totalMemoryMb The total memory in MB
-     * @param usedMemoryMb The used memory in MB
-     * @return the average percentage used 0.0 to 100.0.
-     * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
-     *     resources that are not present in the total.
-     */
-    public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
-                + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
-                toNormalizedMap(), used.toNormalizedMap());
-        }
-
-        int skippedResourceTypes = 0;
-        double total = 0.0;
-        if (usedMemoryMb > totalMemoryMb) {
-            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-        }
-        if (totalMemoryMb != 0.0) {
-            total += usedMemoryMb / totalMemoryMb;
-        } else {
-            skippedResourceTypes++;
-        }
-        double totalCpu = getTotalCpu();
-        if (used.getTotalCpu() > getTotalCpu()) {
-            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-        }
-        if (totalCpu != 0.0) {
-            total += used.getTotalCpu() / getTotalCpu();
-        } else {
-            skippedResourceTypes++;
-        }
-
-        if (used.otherResources.length > otherResources.length) {
-            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-        }
-
-        for (int i = 0; i < otherResources.length; i++) {
-            double totalValue = otherResources[i];
-            double usedValue;
-            if (i >= used.otherResources.length) {
-                //Resources missing from used are using none of that resource
-                usedValue = 0.0;
-            } else {
-                usedValue = used.otherResources[i];
-            }
-            if (usedValue > totalValue) {
-                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-            }
-            if (totalValue == 0.0) {
-                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
-                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
-                skippedResourceTypes++;
-                continue;
-            }
-
-            total += usedValue / totalValue;
-        }
-        //Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
-        int divisor = 2 + otherResources.length - skippedResourceTypes;
-        if (divisor == 0) {
-            /*
-             * This is an arbitrary choice to make the result consistent with calculateMin. Any value would be valid here, becase there are
-             * no (non-zero) resources in the total set of resources, so we're trying to average 0 values.
-             */
-            return 100.0;
-        } else {
-            return (total * 100.0) / divisor;
-        }
-    }
-
-    /**
-     * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used. Used must be a
-     * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
-     * division by 0. If all resources are skipped the result is defined to be 100.0.
-     *
-     * @param used the amount of resources used.
-     * @param totalMemoryMb The total memory in MB
-     * @param usedMemoryMb The used memory in MB
-     * @return the minimum percentage used 0.0 to 100.0.
-     * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
-     *     resources that are not present in the total.
-     */
-    public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}"
-                + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
-                toNormalizedMap(), used.toNormalizedMap());
-        }
-
-        double min = 1.0;
-        if (usedMemoryMb > totalMemoryMb) {
-            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-        }
-        if (totalMemoryMb != 0.0) {
-            min = Math.min(min, usedMemoryMb / totalMemoryMb);
-        }
-        double totalCpu = getTotalCpu();
-        if (used.getTotalCpu() > totalCpu) {
-            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-        }
-        if (totalCpu != 0.0) {
-            min = Math.min(min, used.getTotalCpu() / totalCpu);
-        }
-
-        if (used.otherResources.length > otherResources.length) {
-            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-        }
-        
-        for (int i = 0; i < otherResources.length; i++) {
-            if (otherResources[i] == 0.0) {
-                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
-                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
-                continue;
-            }
-            if (i >= used.otherResources.length) {
-                //Resources missing from used are using none of that resource
-                return 0;
-            }
-            if (used.otherResources[i] > otherResources[i]) {
-                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-            }
-            min = Math.min(min, used.otherResources[i] / otherResources[i]);
-        }
-        return min * 100.0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
deleted file mode 100644
index 640233a..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-/**
- * Intended for {@link NormalizedResources} wrappers that handle memory.
- */
-public interface NormalizedResourcesWithMemory {
-
-    NormalizedResources getNormalizedResources();
-
-    double getTotalMemoryMb();
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index db9ca73..84a95a7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -30,6 +30,7 @@ import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
deleted file mode 100644
index cf4f80b..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.storm.Constants;
-
-/**
- * Provides translation between normalized resource maps and resource value arrays. Some operations use resource value arrays instead of the
- * full normalized resource map as an optimization. See {@link NormalizedResources}.
- */
-public class ResourceMapArrayBridge {
-
-    private final ConcurrentMap<String, Integer> resourceNamesToArrayIndex = new ConcurrentHashMap<>();
-    private final AtomicInteger counter = new AtomicInteger(0);
-
-    /**
-     * Translates a normalized resource map to an array of resource values. Each resource name will be assigned an index in the array, which
-     * is guaranteed to be consistent with subsequent invocations of this method. Note that CPU and memory resources are not translated by
-     * this method, as they are expected to be captured elsewhere.
-     *
-     * @param normalizedResources The resources to translate to an array
-     * @return The array of resource values
-     */
-    public double[] translateToResourceArray(Map<String, Double> normalizedResources) {
-        //To avoid locking we will go through the map twice.  It should be small so it is probably not a big deal
-        for (String key : normalizedResources.keySet()) {
-            //We are going to skip over CPU and Memory, because they are captured elsewhere
-            if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
-                && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
-                && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
-                && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
-                resourceNamesToArrayIndex.computeIfAbsent(key, (k) -> counter.getAndIncrement());
-            }
-        }
-        //By default all of the values are 0
-        double[] ret = new double[counter.get()];
-        for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
-            Integer index = resourceNamesToArrayIndex.get(entry.getKey());
-            if (index != null) {
-                //index == null if it is memory or CPU
-                ret[index] = entry.getValue();
-            }
-        }
-        return ret;
-    }
-
-    /**
-     * Translates an array of resource values to a normalized resource map.
-     *
-     * @param resources The resource array to translate
-     * @return The normalized resource map
-     */
-    public Map<String, Double> translateFromResourceArray(double[] resources) {
-        Map<String, Double> ret = new HashMap<>();
-        int length = resources.length;
-        for (Map.Entry<String, Integer> entry : resourceNamesToArrayIndex.entrySet()) {
-            int index = entry.getValue();
-            if (index < length) {
-                ret.put(entry.getKey(), resources[index]);
-            }
-        }
-        return ret;
-    }
-
-    public Map<String, Integer> getResourceNamesToArrayIndex() {
-        return Collections.unmodifiableMap(resourceNamesToArrayIndex);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
deleted file mode 100644
index d59ba92..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
-
-/**
- * Provides resource name normalization for resource maps.
- */
-public class ResourceNameNormalizer {
-
-    private final Map<String, String> resourceNameMapping;
-
-    public ResourceNameNormalizer() {
-        Map<String, String> tmp = new HashMap<>();
-        tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
-        tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
-        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
-        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
-        tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
-        resourceNameMapping = Collections.unmodifiableMap(tmp);
-    }
-
-    /**
-     * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
-     *
-     * @param resourceMap resource map of either Supervisor or Topology
-     * @return the resource map with common resource names
-     */
-    public Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
-        if (resourceMap == null) {
-            return new HashMap<>();
-        }
-        return new HashMap<>(resourceMap.entrySet().stream()
-            .collect(Collectors.toMap(
-                //Map the key if needed
-                (e) -> resourceNameMapping.getOrDefault(e.getKey(), e.getKey()),
-                //Map the value
-                (e) -> e.getValue().doubleValue())));
-    }
-
-    public Map<String, String> getResourceNameMapping() {
-        return resourceNameMapping;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index ca4ea63..2f9ab4c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -25,6 +25,8 @@ import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
new file mode 100644
index 0000000..417dca9
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.Map;
+import org.apache.storm.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An offer of resources with normalized resource names.
+ */
+public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceOffer.class);
+    private final NormalizedResources normalizedResources;
+    private double totalMemoryMb;
+
+    /**
+     * Create a new normalized resource offer.
+     *
+     * @param resources the resources to be normalized.
+     */
+    public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
+        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
+        totalMemoryMb = normalizedResourceMap.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+        this.normalizedResources = new NormalizedResources(normalizedResourceMap);
+    }
+
+    public NormalizedResourceOffer() {
+        this((Map<String, ? extends Number>) null);
+    }
+
+    public NormalizedResourceOffer(NormalizedResourceOffer other) {
+        this.totalMemoryMb = other.totalMemoryMb;
+        this.normalizedResources = new NormalizedResources(other.normalizedResources);
+    }
+
+    @Override
+    public double getTotalMemoryMb() {
+        return totalMemoryMb;
+    }
+
+    public Map<String, Double> toNormalizedMap() {
+        Map<String, Double> ret = normalizedResources.toNormalizedMap();
+        ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
+        return ret;
+    }
+
+    public void add(NormalizedResourcesWithMemory other) {
+        normalizedResources.add(other.getNormalizedResources());
+        totalMemoryMb += other.getTotalMemoryMb();
+    }
+
+    public void remove(NormalizedResourcesWithMemory other) {
+        normalizedResources.remove(other.getNormalizedResources());
+        totalMemoryMb -= other.getTotalMemoryMb();
+        if (totalMemoryMb < 0.0) {
+            normalizedResources.throwBecauseResourceBecameNegative(
+                Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
+        };
+    }
+
+    /**
+     * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
+     * double, double).
+     */
+    public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
+        return normalizedResources.calculateAveragePercentageUsedBy(
+            used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+    }
+
+    /**
+     * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+     * double)
+     */
+    public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) {
+        return normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+    }
+
+    /**
+     * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+     * double).
+     */
+    public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
+        return normalizedResources.couldHoldIgnoringSharedMemory(
+            other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
+    }
+
+    public double getTotalCpu() {
+        return normalizedResources.getTotalCpu();
+    }
+
+    @Override
+    public NormalizedResources getNormalizedResources() {
+        return normalizedResources;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
new file mode 100644
index 0000000..6627bb5
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ObjectReader;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A resource request with normalized resource names.
+ */
+public class NormalizedResourceRequest implements NormalizedResourcesWithMemory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
+
+    private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) {
+        if (!dest.containsKey(destKey)) {
+            Number value = (Number) src.get(srcKey);
+            if (value != null) {
+                dest.put(destKey, value.doubleValue());
+            }
+        }
+    }
+
+    private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) {
+        Map<String, Double> ret = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
+            Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
+        putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+        putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+        putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        return ret;
+    }
+
+    private static Map<String, Double> parseResources(String input) {
+        Map<String, Double> topologyResources = new HashMap<>();
+        JSONParser parser = new JSONParser();
+        LOG.debug("Input to parseResources {}", input);
+        try {
+            if (input != null) {
+                Object obj = parser.parse(input);
+                JSONObject jsonObject = (JSONObject) obj;
+
+                // Legacy resource parsing
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+                    Double topoMemOnHeap = ObjectReader
+                        .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+                    topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
+                }
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
+                    Double topoMemOffHeap = ObjectReader
+                        .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+                    topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
+                }
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+                    Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
+                        null);
+                    topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
+                }
+
+                // If resource is also present in resources map will overwrite the above
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
+                    Map<String, Number> rawResourcesMap =
+                        (Map<String, Number>) jsonObject.computeIfAbsent(
+                            Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
+
+                    for (Map.Entry<String, Number> stringNumberEntry : rawResourcesMap.entrySet()) {
+                        topologyResources.put(
+                            stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
+                    }
+
+                }
+            }
+        } catch (ParseException e) {
+            LOG.error("Failed to parse component resources is:" + e.toString(), e);
+            return null;
+        }
+        return topologyResources;
+    }
+
+    private final NormalizedResources normalizedResources;
+    private double onHeap;
+    private double offHeap;
+
+    private NormalizedResourceRequest(Map<String, ? extends Number> resources,
+        Map<String, Double> defaultResources) {
+        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
+        normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
+        onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+        offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+        normalizedResources = new NormalizedResources(normalizedResourceMap);
+    }
+
+    public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
+        this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
+    }
+
+    public NormalizedResourceRequest(Map<String, Object> topoConf) {
+        this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
+    }
+
+    public NormalizedResourceRequest() {
+        this((Map<String, ? extends Number>) null, null);
+    }
+
+    public Map<String, Double> toNormalizedMap() {
+        Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
+        ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
+        ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
+        return ret;
+    }
+
+    public double getOnHeapMemoryMb() {
+        return onHeap;
+    }
+
+    public void addOnHeap(final double onHeap) {
+        this.onHeap += onHeap;
+    }
+
+    public double getOffHeapMemoryMb() {
+        return offHeap;
+    }
+
+    public void addOffHeap(final double offHeap) {
+        this.offHeap += offHeap;
+    }
+
+    /**
+     * Add the resources in other to this.
+     *
+     * @param other the other Request to add to this.
+     */
+    public void add(NormalizedResourceRequest other) {
+        this.normalizedResources.add(other.normalizedResources);
+        onHeap += other.onHeap;
+        offHeap += other.offHeap;
+    }
+
+    public void add(WorkerResources value) {
+        this.normalizedResources.add(value);
+        //The resources are already normalized
+        Map<String, Double> resources = value.get_resources();
+        onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+        offHeap += resources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+    }
+
+    @Override
+    public double getTotalMemoryMb() {
+        return onHeap + offHeap;
+    }
+
+    @Override
+    public String toString() {
+        return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
+    }
+
+    public double getTotalCpu() {
+        return this.normalizedResources.getTotalCpu();
+    }
+
+    @Override
+    public NormalizedResources getNormalizedResources() {
+        return this.normalizedResources;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
new file mode 100644
index 0000000..76d5ce2
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.WorkerResources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resources that have been normalized. This class is intended as a delegate for more specific types of normalized resource set, since it
+ * does not keep track of memory as a resource.
+ */
+public class NormalizedResources {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
+
+    public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
+    private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
+
+    static {
+        resetResourceNames();
+    }
+
+    private double cpu;
+    private double[] otherResources;
+
+    /**
+     * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some
+     * algorithms sadly have different behavior if a resource exists or not.
+     */
+    @VisibleForTesting
+    public static void resetResourceNames() {
+        RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
+        RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
+    }
+
+    /**
+     * Copy constructor.
+     */
+    public NormalizedResources(NormalizedResources other) {
+        cpu = other.cpu;
+        otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
+    }
+
+    /**
+     * Create a new normalized set of resources. Note that memory is not managed by this class, as it is not consistent in requests vs
+     * offers because of how on heap vs off heap is used.
+     *
+     * @param normalizedResources the normalized resource map
+     */
+    public NormalizedResources(Map<String, Double> normalizedResources) {
+        cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+        otherResources = RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(normalizedResources);
+    }
+
+    /**
+     * Get the total amount of cpu.
+     *
+     * @return the amount of cpu.
+     */
+    public double getTotalCpu() {
+        return cpu;
+    }
+    
+    private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
+        if (requiredLength > otherResources.length) {
+            double[] newResources = new double[requiredLength];
+            System.arraycopy(otherResources, 0, newResources, 0, otherResources.length);
+            otherResources = newResources;
+        }
+    }
+
+    private void add(double[] resourceArray) {
+        int otherLength = resourceArray.length;
+        zeroPadOtherResourcesIfNecessary(otherLength);
+        for (int i = 0; i < otherLength; i++) {
+            otherResources[i] += resourceArray[i];
+        }
+    }
+
+    public void add(NormalizedResources other) {
+        this.cpu += other.cpu;
+        add(other.otherResources);
+    }
+
+    /**
+     * Add the resources from a worker to this.
+     *
+     * @param value the worker resources that should be added to this.
+     */
+    public void add(WorkerResources value) {
+        Map<String, Double> workerNormalizedResources = value.get_resources();
+        cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+        add(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources));
+    }
+
+    /**
+     * Throw an IllegalArgumentException because a resource became negative during remove.
+     * @param resourceName The name of the resource that became negative
+     * @param currentValue The current value of the resource
+     * @param subtractedValue The value that was subtracted to make the resource negative
+     */
+    public void throwBecauseResourceBecameNegative(String resourceName, double currentValue, double subtractedValue) {
+        throw new IllegalArgumentException(String.format("Resource amounts should never be negative."
+            + " Resource '%s' with current value '%f' became negative because '%f' was removed.",
+            resourceName, currentValue, subtractedValue));
+    }
+    
+    /**
+     * Remove the other resources from this. This is the same as subtracting the resources in other from this.
+     *
+     * @param other the resources we want removed.
+     * @throws IllegalArgumentException if subtracting other from this would result in any resource amount becoming negative.
+     */
+    public void remove(NormalizedResources other) {
+        this.cpu -= other.cpu;
+        if (cpu < 0.0) {
+            throwBecauseResourceBecameNegative(Constants.COMMON_CPU_RESOURCE_NAME, cpu, other.cpu);
+        }
+        int otherLength = other.otherResources.length;
+        zeroPadOtherResourcesIfNecessary(otherLength);
+        for (int i = 0; i < otherLength; i++) {
+            otherResources[i] -= other.otherResources[i];
+            if (otherResources[i] < 0.0) {
+                throwBecauseResourceBecameNegative(getResourceNameForResourceIndex(i), otherResources[i], other.otherResources[i]);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "Normalized resources: " + toNormalizedMap();
+    }
+
+    /**
+     * Return a Map of the normalized resource name to a double. This should only be used when returning thrift resource requests to the end
+     * user.
+     */
+    public Map<String, Double> toNormalizedMap() {
+        Map<String, Double> ret = RESOURCE_MAP_ARRAY_BRIDGE.translateFromResourceArray(otherResources);
+        ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+        return ret;
+    }
+
+    private double getResourceAt(int index) {
+        if (index >= otherResources.length) {
+            return 0.0;
+        }
+        return otherResources[index];
+    }
+
+    /**
+     * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
+     * does not check memory because with shared memory it is beyond the scope of this.
+     *
+     * @param other the resources that we want to check if they would fit in this.
+     * @param thisTotalMemoryMb The total memory in MB of this
+     * @param otherTotalMemoryMb The total memory in MB of other
+     * @return true if it might fit, else false if it could not possibly fit.
+     */
+    public boolean couldHoldIgnoringSharedMemory(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
+        if (this.cpu < other.getTotalCpu()) {
+            return false;
+        }
+        int length = Math.max(this.otherResources.length, other.otherResources.length);
+        for (int i = 0; i < length; i++) {
+            if (getResourceAt(i) < other.getResourceAt(i)) {
+                return false;
+            }
+        }
+
+        return thisTotalMemoryMb >= otherTotalMemoryMb;
+    }
+
+    private String getResourceNameForResourceIndex(int resourceIndex) {
+        for (Map.Entry<String, Integer> entry : RESOURCE_MAP_ARRAY_BRIDGE.getResourceNamesToArrayIndex().entrySet()) {
+            int index = entry.getValue();
+            if (index == resourceIndex) {
+                return entry.getKey();
+            }
+        }
+        return null;
+    }
+
+    private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+        throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
+            + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
+            used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
+    }
+
+    /**
+     * Calculate the average resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+     * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+     * division by 0. If all resources are skipped the result is defined to be 100.0.
+     *
+     * @param used the amount of resources used.
+     * @param totalMemoryMb The total memory in MB
+     * @param usedMemoryMb The used memory in MB
+     * @return the average percentage used 0.0 to 100.0.
+     * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+     *     resources that are not present in the total.
+     */
+    public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
+                + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+                toNormalizedMap(), used.toNormalizedMap());
+        }
+
+        int skippedResourceTypes = 0;
+        double total = 0.0;
+        if (usedMemoryMb > totalMemoryMb) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        if (totalMemoryMb != 0.0) {
+            total += usedMemoryMb / totalMemoryMb;
+        } else {
+            skippedResourceTypes++;
+        }
+        double totalCpu = getTotalCpu();
+        if (used.getTotalCpu() > getTotalCpu()) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        if (totalCpu != 0.0) {
+            total += used.getTotalCpu() / getTotalCpu();
+        } else {
+            skippedResourceTypes++;
+        }
+
+        if (used.otherResources.length > otherResources.length) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+
+        for (int i = 0; i < otherResources.length; i++) {
+            double totalValue = otherResources[i];
+            double usedValue;
+            if (i >= used.otherResources.length) {
+                //Resources missing from used are using none of that resource
+                usedValue = 0.0;
+            } else {
+                usedValue = used.otherResources[i];
+            }
+            if (usedValue > totalValue) {
+                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+            }
+            if (totalValue == 0.0) {
+                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+                skippedResourceTypes++;
+                continue;
+            }
+
+            total += usedValue / totalValue;
+        }
+        //Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
+        int divisor = 2 + otherResources.length - skippedResourceTypes;
+        if (divisor == 0) {
+            /*
+             * This is an arbitrary choice to make the result consistent with calculateMin. Any value would be valid here, becase there are
+             * no (non-zero) resources in the total set of resources, so we're trying to average 0 values.
+             */
+            return 100.0;
+        } else {
+            return (total * 100.0) / divisor;
+        }
+    }
+
+    /**
+     * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+     * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+     * division by 0. If all resources are skipped the result is defined to be 100.0.
+     *
+     * @param used the amount of resources used.
+     * @param totalMemoryMb The total memory in MB
+     * @param usedMemoryMb The used memory in MB
+     * @return the minimum percentage used 0.0 to 100.0.
+     * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+     *     resources that are not present in the total.
+     */
+    public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}"
+                + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+                toNormalizedMap(), used.toNormalizedMap());
+        }
+
+        double min = 1.0;
+        if (usedMemoryMb > totalMemoryMb) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        if (totalMemoryMb != 0.0) {
+            min = Math.min(min, usedMemoryMb / totalMemoryMb);
+        }
+        double totalCpu = getTotalCpu();
+        if (used.getTotalCpu() > totalCpu) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        if (totalCpu != 0.0) {
+            min = Math.min(min, used.getTotalCpu() / totalCpu);
+        }
+
+        if (used.otherResources.length > otherResources.length) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        
+        for (int i = 0; i < otherResources.length; i++) {
+            if (otherResources[i] == 0.0) {
+                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+                continue;
+            }
+            if (i >= used.otherResources.length) {
+                //Resources missing from used are using none of that resource
+                return 0;
+            }
+            if (used.otherResources[i] > otherResources[i]) {
+                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+            }
+            min = Math.min(min, used.otherResources[i] / otherResources[i]);
+        }
+        return min * 100.0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
new file mode 100644
index 0000000..5001645
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+/**
+ * Intended for {@link NormalizedResources} wrappers that handle memory.
+ */
+public interface NormalizedResourcesWithMemory {
+
+    NormalizedResources getNormalizedResources();
+
+    double getTotalMemoryMb();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
new file mode 100644
index 0000000..2b07c65
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.storm.Constants;
+
+/**
+ * Provides translation between normalized resource maps and resource value arrays. Some operations use resource value arrays instead of the
+ * full normalized resource map as an optimization. See {@link NormalizedResources}.
+ */
+public class ResourceMapArrayBridge {
+
+    private final ConcurrentMap<String, Integer> resourceNamesToArrayIndex = new ConcurrentHashMap<>();
+    private final AtomicInteger counter = new AtomicInteger(0);
+
+    /**
+     * Translates a normalized resource map to an array of resource values. Each resource name will be assigned an index in the array, which
+     * is guaranteed to be consistent with subsequent invocations of this method. Note that CPU and memory resources are not translated by
+     * this method, as they are expected to be captured elsewhere.
+     *
+     * @param normalizedResources The resources to translate to an array
+     * @return The array of resource values
+     */
+    public double[] translateToResourceArray(Map<String, Double> normalizedResources) {
+        //To avoid locking we will go through the map twice.  It should be small so it is probably not a big deal
+        for (String key : normalizedResources.keySet()) {
+            //We are going to skip over CPU and Memory, because they are captured elsewhere
+            if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
+                resourceNamesToArrayIndex.computeIfAbsent(key, (k) -> counter.getAndIncrement());
+            }
+        }
+        //By default all of the values are 0
+        double[] ret = new double[counter.get()];
+        for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
+            Integer index = resourceNamesToArrayIndex.get(entry.getKey());
+            if (index != null) {
+                //index == null if it is memory or CPU
+                ret[index] = entry.getValue();
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Translates an array of resource values to a normalized resource map.
+     *
+     * @param resources The resource array to translate
+     * @return The normalized resource map
+     */
+    public Map<String, Double> translateFromResourceArray(double[] resources) {
+        Map<String, Double> ret = new HashMap<>();
+        int length = resources.length;
+        for (Map.Entry<String, Integer> entry : resourceNamesToArrayIndex.entrySet()) {
+            int index = entry.getValue();
+            if (index < length) {
+                ret.put(entry.getKey(), resources[index]);
+            }
+        }
+        return ret;
+    }
+
+    public Map<String, Integer> getResourceNamesToArrayIndex() {
+        return Collections.unmodifiableMap(resourceNamesToArrayIndex);
+    }
+
+}


[6/6] storm git commit: Merge branch 'STORM-2859' of https://github.com/srdo/storm into STORM-2859

Posted by bo...@apache.org.
Merge branch 'STORM-2859' of https://github.com/srdo/storm into STORM-2859

STORM-2859: Fix a number of issues with NormalizedResources when resource totals are zero

This closes #2485


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e8dd1f7e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e8dd1f7e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e8dd1f7e

Branch: refs/heads/master
Commit: e8dd1f7e2df5e034a20573b89cb89e9f3529e373
Parents: 7ecb3d7 8853334
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon Jan 8 15:02:55 2018 -0600
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon Jan 8 15:02:55 2018 -0600

----------------------------------------------------------------------
 storm-server/pom.xml                            |   2 +-
 .../apache/storm/blobstore/BlobStoreUtils.java  |   6 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |   5 +-
 .../supervisor/timer/SupervisorHeartbeat.java   |  16 +-
 .../org/apache/storm/scheduler/Cluster.java     |   9 +-
 .../storm/scheduler/ISchedulingState.java       |   5 +-
 .../storm/scheduler/SupervisorDetails.java      |   2 +-
 .../apache/storm/scheduler/TopologyDetails.java |   2 +-
 .../resource/NormalizedResourceOffer.java       |  89 ----
 .../resource/NormalizedResourceRequest.java     | 194 ---------
 .../scheduler/resource/NormalizedResources.java | 304 --------------
 .../storm/scheduler/resource/RAS_Node.java      |   4 +-
 .../storm/scheduler/resource/RAS_Nodes.java     |   1 -
 .../resource/ResourceAwareScheduler.java        |   1 -
 .../storm/scheduler/resource/ResourceUtils.java |  16 +-
 .../normalization/NormalizedResourceOffer.java  | 114 ++++++
 .../NormalizedResourceRequest.java              | 190 +++++++++
 .../normalization/NormalizedResources.java      | 343 ++++++++++++++++
 .../NormalizedResourcesWithMemory.java          |  28 ++
 .../normalization/ResourceMapArrayBridge.java   |  89 ++++
 .../normalization/ResourceNameNormalizer.java   |  68 ++++
 .../scheduling/BaseResourceAwareStrategy.java   |   6 +-
 .../DefaultResourceAwareStrategy.java           |  15 +-
 .../GenericResourceAwareStrategy.java           |   3 -
 .../org/apache/storm/utils/ServerUtils.java     |  16 +-
 .../resource/TestResourceAwareScheduler.java    |   8 +-
 .../TestUtilsForResourceAwareScheduler.java     |   5 +-
 .../normalization/NormalizedResourcesRule.java  |  49 +++
 .../normalization/NormalizedResourcesTest.java  | 403 +++++++++++++++++++
 .../ResourceMapArrayBridgeTest.java             |  63 +++
 .../scheduling/NormalizedResourcesRule.java     |  50 ---
 .../TestDefaultResourceAwareStrategy.java       |  40 +-
 .../TestGenericResourceAwareStrategy.java       |   4 +-
 33 files changed, 1422 insertions(+), 728 deletions(-)
----------------------------------------------------------------------



[2/6] storm git commit: Split up NormalizedResources into a few classes with more narrow responsibilities. Make NormalizedResources a regular class instead of an abstract class. Add some tests. Make calculateAvg/Min throw exceptions if the resource total

Posted by bo...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
new file mode 100644
index 0000000..d25e76c
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
@@ -0,0 +1,404 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import org.apache.storm.scheduler.resource.NormalizedResources;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import org.apache.storm.Constants;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class NormalizedResourcesTest {
+    
+    @Rule
+    public NormalizedResourcesRule normalizedResourcesRule = new NormalizedResourcesRule();
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    
+    private final String gpuResourceName = "gpu";
+    
+    private Map<String, Double> normalize(Map<String, ? extends Number> resources) {
+        return NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
+    }
+    
+    @Test
+    public void testAddCpu() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+        NormalizedResources addedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+        
+        resources.add(addedResources);
+        
+        Map<String, Double> normalizedMap = resources.toNormalizedMap();
+        assertThat(normalizedMap.get(Constants.COMMON_CPU_RESOURCE_NAME), is(2.0));
+        assertThat(resources.getTotalCpu(), is(2.0));
+    }
+    
+    @Test
+    public void testAddToExistingResource() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+        NormalizedResources addedResources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+        
+        resources.add(addedResources);
+        
+        Map<String, Double> normalizedMap = resources.toNormalizedMap();
+        assertThat(normalizedMap.get(gpuResourceName), is(2.0));
+    }
+    
+    @Test
+    public void testAddWhenOtherHasMoreResourcesThanThis() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.emptyMap()));
+        NormalizedResources addedResources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+        
+        resources.add(addedResources);
+        
+        Map<String, Double> normalizedMap = resources.toNormalizedMap();
+        assertThat(normalizedMap.get(gpuResourceName), is(1.0));
+    }
+    
+    @Test
+    public void testAddWhenOtherHasDifferentResourceThanThis() {
+        String disks = "disks";
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(disks, 23)));
+        NormalizedResources addedResources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+        
+        resources.add(addedResources);
+        
+        Map<String, Double> normalizedMap = resources.toNormalizedMap();
+        assertThat(normalizedMap.get(disks), is(23.0));
+        assertThat(normalizedMap.get(gpuResourceName), is(1.0));
+    }
+    
+    @Test
+    public void testRemoveThrowsIfResourcesBecomeNegative() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+        NormalizedResources removedResources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 2)));
+        
+        expectedException.expect(IllegalArgumentException.class);
+        resources.remove(removedResources);
+    }
+    
+    @Test
+    public void testRemoveThrowsIfCpuBecomesNegative() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+        NormalizedResources removedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+        
+        expectedException.expect(IllegalArgumentException.class);
+        resources.remove(removedResources);
+    }
+    
+    @Test
+    public void testRemoveFromCpu() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+        NormalizedResources removedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+        
+        resources.remove(removedResources);
+        
+        Map<String, Double> normalizedMap = resources.toNormalizedMap();
+        assertThat(normalizedMap.get(Constants.COMMON_CPU_RESOURCE_NAME), is(1.0));
+        assertThat(resources.getTotalCpu(), is(1.0));
+    }
+    
+    @Test
+    public void testRemoveFromExistingResources() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 15)));
+        NormalizedResources removedResources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+        
+        resources.remove(removedResources);
+        
+        Map<String, Double> normalizedMap = resources.toNormalizedMap();
+        assertThat(normalizedMap.get(gpuResourceName), is(14.0));
+    }
+
+    @Test
+    public void testCouldHoldWithTooFewCpus() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+        NormalizedResources resourcesToCheck = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+        
+        boolean couldHold = resources.couldHoldIgnoringSharedMemory(resourcesToCheck, 100, 1);
+        
+        assertThat(couldHold, is(false));
+    }
+    
+    @Test
+    public void testCouldHoldWithTooFewResource() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+        NormalizedResources resourcesToCheck = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 2)));
+        
+        boolean couldHold = resources.couldHoldIgnoringSharedMemory(resourcesToCheck, 100, 1);
+        
+        assertThat(couldHold, is(false));
+    }
+    
+    @Test
+    public void testCouldHoldWithTooLittleMemory() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+        NormalizedResources resourcesToCheck = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+        
+        boolean couldHold = resources.couldHoldIgnoringSharedMemory(resourcesToCheck, 100, 200);
+        
+        assertThat(couldHold, is(false));
+    }
+    
+    @Test
+    public void testCouldHoldWithMissingResource() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.emptyMap()));
+        NormalizedResources resourcesToCheck = new NormalizedResources(normalize(Collections.singletonMap(gpuResourceName, 1)));
+        
+        boolean couldHold = resources.couldHoldIgnoringSharedMemory(resourcesToCheck, 100, 1);
+        
+        assertThat(couldHold, is(false));
+    }
+    
+    @Test
+    public void testCouldHoldWithEnoughResources() {
+        Map<String, Double> allResources = new HashMap<>();
+        allResources.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+        allResources.put(gpuResourceName, 2.0);
+        NormalizedResources resources = new NormalizedResources(normalize(allResources));
+        NormalizedResources resourcesToCheck = new NormalizedResources(normalize(allResources));
+        
+        boolean couldHold = resources.couldHoldIgnoringSharedMemory(resourcesToCheck, 100, 100);
+        
+        assertThat(couldHold, is(true));
+    }
+    
+    @Test
+    public void testCalculateAvgUsageWithNoResourcesInTotal() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.emptyMap()));
+        NormalizedResources usedResources = new NormalizedResources(normalize(Collections.emptyMap()));
+        
+        double avg = resources.calculateAveragePercentageUsedBy(usedResources, 0, 0);
+        
+        assertThat(avg, is(100.0));
+    }
+    
+    @Test
+    public void testCalculateAvgWithOnlyCpu() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+        NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+        
+        double avg = resources.calculateAveragePercentageUsedBy(usedResources, 0, 0);
+        
+        assertThat(avg, is(50.0));
+    }
+    
+    @Test
+    public void testCalculateAvgWithCpuAndMem() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+        NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+        
+        double avg = resources.calculateAveragePercentageUsedBy(usedResources, 4, 1);
+        
+        assertThat(avg, is((50.0 + 25.0)/2));
+    }
+    
+    @Test
+    public void testCalculateAvgWithCpuMemAndGenericResource() {
+        Map<String, Double> allResourcesMap = new HashMap<>();
+        allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+        allResourcesMap.put(gpuResourceName, 10.0);
+        NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+        Map<String, Double> usedResourcesMap = new HashMap<>();
+        usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+        usedResourcesMap.put(gpuResourceName, 1.0);
+        NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+        
+        double avg = resources.calculateAveragePercentageUsedBy(usedResources, 4, 1);
+        
+        assertThat(avg, is((50.0 + 25.0 + 10.0)/3));
+    }
+    
+    @Test
+    public void testCalculateAvgWithUnusedResource() {
+        Map<String, Double> allResourcesMap = new HashMap<>();
+        allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+        allResourcesMap.put(gpuResourceName, 10.0);
+        NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+        Map<String, Double> usedResourcesMap = new HashMap<>();
+        usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+        NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+        
+        double avg = resources.calculateAveragePercentageUsedBy(usedResources, 4, 1);
+        
+        //The resource that is not used should count as if it is being used 0%
+        assertThat(avg, is((50.0 + 25.0)/3));
+    }
+    
+    @Test
+    public void testCalculateAvgThrowsIfTotalIsMissingCpu() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+        NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 5)));
+        
+        expectedException.expect(IllegalArgumentException.class);
+        resources.calculateAveragePercentageUsedBy(usedResources, 0, 0);
+    }
+    
+    @Test
+    public void testCalculateAvgThrowsIfTotalIsMissingMemory() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+        NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+        
+        expectedException.expect(IllegalArgumentException.class);
+        resources.calculateAveragePercentageUsedBy(usedResources, 100, 500);
+    }
+    
+    @Test
+    public void testCalculateAvgWithResourceMissingFromTotal() {
+        Map<String, Double> allResourcesMap = new HashMap<>();
+        allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+        NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+        Map<String, Double> usedResourcesMap = new HashMap<>();
+        usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+        usedResourcesMap.put(gpuResourceName, 1.0);
+        NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+        
+        expectedException.expect(IllegalArgumentException.class);
+        resources.calculateAveragePercentageUsedBy(usedResources, 4, 1);        
+    }
+    
+    @Test
+    public void testCalculateAvgWithTooLittleResourceInTotal() {
+        Map<String, Double> allResourcesMap = new HashMap<>();
+        allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+        allResourcesMap.put(gpuResourceName, 1.0);
+        NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+        Map<String, Double> usedResourcesMap = new HashMap<>();
+        usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+        usedResourcesMap.put(gpuResourceName, 5.0);
+        NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+        
+        expectedException.expect(IllegalArgumentException.class);
+        resources.calculateAveragePercentageUsedBy(usedResources, 4, 1);        
+    }
+    
+    @Test
+    public void testCalculateMinUsageWithNoResourcesInTotal() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.emptyMap()));
+        NormalizedResources usedResources = new NormalizedResources(normalize(Collections.emptyMap()));
+        
+        double min = resources.calculateMinPercentageUsedBy(usedResources, 0, 0);
+        
+        assertThat(min, is(100.0));
+    }
+    
+    @Test
+    public void testCalculateMinWithOnlyCpu() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+        NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+        
+        double min = resources.calculateMinPercentageUsedBy(usedResources, 0, 0);
+        
+        assertThat(min, is(50.0));
+    }
+    
+    @Test
+    public void testCalculateMinWithCpuAndMem() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+        NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+        
+        double min = resources.calculateMinPercentageUsedBy(usedResources, 4, 1);
+        
+        assertThat(min, is(25.0));
+    }
+    
+    @Test
+    public void testCalculateMinWithCpuMemAndGenericResource() {
+        Map<String, Double> allResourcesMap = new HashMap<>();
+        allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+        allResourcesMap.put(gpuResourceName, 10.0);
+        NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+        Map<String, Double> usedResourcesMap = new HashMap<>();
+        usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+        usedResourcesMap.put(gpuResourceName, 1.0);
+        NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+        
+        double min = resources.calculateMinPercentageUsedBy(usedResources, 4, 1);
+        
+        assertThat(min, is(10.0));
+    }
+    
+    @Test
+    public void testCalculateMinWithUnusedResource() {
+        Map<String, Double> allResourcesMap = new HashMap<>();
+        allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+        allResourcesMap.put(gpuResourceName, 10.0);
+        NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+        Map<String, Double> usedResourcesMap = new HashMap<>();
+        usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+        NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+        
+        double min = resources.calculateMinPercentageUsedBy(usedResources, 4, 1);
+        
+        //The resource that is not used should count as if it is being used 0%
+        assertThat(min, is(0.0));
+    }
+    
+    @Test
+    public void testCalculateMinThrowsIfTotalIsMissingCpu() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+        NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 5)));
+        
+        expectedException.expect(IllegalArgumentException.class);
+        resources.calculateMinPercentageUsedBy(usedResources, 0, 0);
+    }
+    
+    @Test
+    public void testCalculateMinThrowsIfTotalIsMissingMemory() {
+        NormalizedResources resources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 2)));
+        NormalizedResources usedResources = new NormalizedResources(normalize(Collections.singletonMap(Constants.COMMON_CPU_RESOURCE_NAME, 1)));
+        
+        expectedException.expect(IllegalArgumentException.class);
+        resources.calculateMinPercentageUsedBy(usedResources, 100, 500);
+    }
+    
+    @Test
+    public void testCalculateMinWithResourceMissingFromTotal() {
+        Map<String, Double> allResourcesMap = new HashMap<>();
+        allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+        NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+        Map<String, Double> usedResourcesMap = new HashMap<>();
+        usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+        usedResourcesMap.put(gpuResourceName, 1.0);
+        NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+        
+        expectedException.expect(IllegalArgumentException.class);
+        resources.calculateMinPercentageUsedBy(usedResources, 4, 1);        
+    }
+    
+    @Test
+    public void testCalculateMinWithTooLittleResourceInTotal() {
+        Map<String, Double> allResourcesMap = new HashMap<>();
+        allResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 2.0);
+        allResourcesMap.put(gpuResourceName, 1.0);
+        NormalizedResources resources = new NormalizedResources(normalize(allResourcesMap));
+        Map<String, Double> usedResourcesMap = new HashMap<>();
+        usedResourcesMap.put(Constants.COMMON_CPU_RESOURCE_NAME, 1.0);
+        usedResourcesMap.put(gpuResourceName, 5.0);
+        NormalizedResources usedResources = new NormalizedResources(normalize(usedResourcesMap));
+        
+        expectedException.expect(IllegalArgumentException.class);
+        resources.calculateMinPercentageUsedBy(usedResources, 4, 1);        
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
new file mode 100644
index 0000000..f6dedb3
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import org.apache.storm.scheduler.resource.ResourceMapArrayBridge;
+import org.apache.storm.scheduler.resource.NormalizedResources;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+
+public class ResourceMapArrayBridgeTest {
+
+    private final String gpuResourceName = "gpu";
+    private final String disksResourceName = "disks";
+    
+    private Map<String, Double> normalize(Map<String, ? extends Number> resources) {
+        return NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
+    }
+    
+    @Test
+    public void testCanTranslateBackAndForthBetweenMapAndArrayConsistently() {
+        ResourceMapArrayBridge bridge = new ResourceMapArrayBridge();
+        
+        Map<String, Double> allResources = new HashMap<>();
+        allResources.put(gpuResourceName, 2.0);
+        allResources.put(disksResourceName, 64.0);
+        Map<String, Double> normalizedResources = normalize(allResources);
+        double[] resources = bridge.translateToResourceArray(normalizedResources);
+        
+        Map<String, Integer> resourceNamesToArrayIndex = bridge.getResourceNamesToArrayIndex();
+        assertThat(resourceNamesToArrayIndex.size(), is(2));
+        int gpuIndex = resourceNamesToArrayIndex.get(gpuResourceName);
+        int disksIndex = resourceNamesToArrayIndex.get(disksResourceName);
+        
+        assertThat(resources.length, is(2));
+        assertThat(resources[gpuIndex], is(2.0));
+        assertThat(resources[disksIndex], is(64.0));
+        
+        Map<String, Double> roundTrippedResources = bridge.translateFromResourceArray(resources);
+        assertThat(roundTrippedResources, is(normalizedResources));
+        
+        double[] roundTrippedResourceArray = bridge.translateToResourceArray(roundTrippedResources);
+        assertThat(roundTrippedResourceArray, equalTo(resources));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/NormalizedResourcesRule.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/NormalizedResourcesRule.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/NormalizedResourcesRule.java
deleted file mode 100644
index 26bc14d..0000000
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/NormalizedResourcesRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource.strategies.scheduling;
-
-import org.apache.storm.scheduler.resource.NormalizedResources;
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-public class NormalizedResourcesRule implements TestRule {
-
-    public static class NRStatement extends Statement {
-        private final Statement base;
-
-        public NRStatement(Statement base) {
-            this.base = base;
-        }
-
-        @Override
-        public void evaluate() throws Throwable {
-            NormalizedResources.resetResourceNames();
-            try {
-                base.evaluate();
-            } finally {
-                NormalizedResources.resetResourceNames();
-            }
-        }
-    }
-
-    @Override
-    public Statement apply(Statement statement, Description description) {
-        return new NRStatement(statement);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/41db23fe/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 6a3fcbb..f82d0f7 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.scheduler.resource.strategies.scheduling;
 
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesRule;
 import java.util.Collections;
 import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;


[4/6] storm git commit: Move some resource normalization classes to a new package since the resource package was getting crowded

Posted by bo...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java
new file mode 100644
index 0000000..fc9182d
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+
+/**
+ * Provides resource name normalization for resource maps.
+ */
+public class ResourceNameNormalizer {
+
+    private final Map<String, String> resourceNameMapping;
+
+    /**
+     * Creates a new resource name normalizer.
+     */
+    public ResourceNameNormalizer() {
+        Map<String, String> tmp = new HashMap<>();
+        tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
+        tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
+        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
+        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
+        tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
+        resourceNameMapping = Collections.unmodifiableMap(tmp);
+    }
+
+    /**
+     * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
+     *
+     * @param resourceMap resource map of either Supervisor or Topology
+     * @return the resource map with common resource names
+     */
+    public Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
+        if (resourceMap == null) {
+            return new HashMap<>();
+        }
+        return new HashMap<>(resourceMap.entrySet().stream()
+            .collect(Collectors.toMap(
+                //Map the key if needed
+                (e) -> resourceNameMapping.getOrDefault(e.getKey(), e.getKey()),
+                //Map the value
+                (e) -> e.getValue().doubleValue())));
+    }
+
+    public Map<String, String> getResourceNameMapping() {
+        return resourceNameMapping;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 73e1aa0..02d06a9 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -38,7 +38,7 @@ import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.RAS_Node;
 import org.apache.storm.scheduler.resource.RAS_Nodes;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 6f33251..cd16259 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -72,7 +72,7 @@ import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.scheduler.resource.ResourceUtils;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 import org.apache.storm.security.auth.SingleUserPrincipal;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 360db3a..85fb569 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.scheduler.resource;
 
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 3d37ad9..f534f2e 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.scheduler.resource;
 
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.generated.Bolt;

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
index 1f78f53..b6952c6 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesRule.java
@@ -18,7 +18,6 @@
 
 package org.apache.storm.scheduler.resource.normalization;
 
-import org.apache.storm.scheduler.resource.NormalizedResources;
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
index d25e76c..591207d 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesTest.java
@@ -16,7 +16,6 @@
 
 package org.apache.storm.scheduler.resource.normalization;
 
-import org.apache.storm.scheduler.resource.NormalizedResources;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
index f6dedb3..1476d89 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridgeTest.java
@@ -16,8 +16,6 @@
 
 package org.apache.storm.scheduler.resource.normalization;
 
-import org.apache.storm.scheduler.resource.ResourceMapArrayBridge;
-import org.apache.storm.scheduler.resource.NormalizedResources;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;