You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/01/08 21:20:58 UTC

[5/6] storm git commit: Move some resource normalization classes to a new package since the resource package was getting crowded

Move some resource normalization classes to a new package since the resource package was getting crowded


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88533346
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88533346
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88533346

Branch: refs/heads/master
Commit: 88533346dbfb318d8bf623703f6a16a1eab19534
Parents: 41db23f
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sun Jan 7 18:12:55 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Jan 7 18:43:11 2018 +0100

----------------------------------------------------------------------
 storm-server/pom.xml                            |   2 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |   2 +-
 .../supervisor/timer/SupervisorHeartbeat.java   |   2 +-
 .../org/apache/storm/scheduler/Cluster.java     |   6 +-
 .../storm/scheduler/ISchedulingState.java       |   5 +-
 .../storm/scheduler/SupervisorDetails.java      |   2 +-
 .../apache/storm/scheduler/TopologyDetails.java |   2 +-
 .../resource/NormalizedResourceOffer.java       | 114 ------
 .../resource/NormalizedResourceRequest.java     | 190 ----------
 .../scheduler/resource/NormalizedResources.java | 339 ------------------
 .../resource/NormalizedResourcesWithMemory.java |  28 --
 .../storm/scheduler/resource/RAS_Node.java      |   1 +
 .../resource/ResourceMapArrayBridge.java        |  89 -----
 .../resource/ResourceNameNormalizer.java        |  65 ----
 .../storm/scheduler/resource/ResourceUtils.java |   2 +
 .../normalization/NormalizedResourceOffer.java  | 114 ++++++
 .../NormalizedResourceRequest.java              | 190 ++++++++++
 .../normalization/NormalizedResources.java      | 343 +++++++++++++++++++
 .../NormalizedResourcesWithMemory.java          |  28 ++
 .../normalization/ResourceMapArrayBridge.java   |  89 +++++
 .../normalization/ResourceNameNormalizer.java   |  68 ++++
 .../scheduling/BaseResourceAwareStrategy.java   |   2 +-
 .../org/apache/storm/utils/ServerUtils.java     |   2 +-
 .../resource/TestResourceAwareScheduler.java    |   1 +
 .../TestUtilsForResourceAwareScheduler.java     |   1 +
 .../normalization/NormalizedResourcesRule.java  |   1 -
 .../normalization/NormalizedResourcesTest.java  |   1 -
 .../ResourceMapArrayBridgeTest.java             |   2 -
 28 files changed, 849 insertions(+), 842 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index ff917ca..7c301a4 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -130,7 +130,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>2620</maxAllowedViolations>
+                    <maxAllowedViolations>2617</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 2c785a5..e66dbe5 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -150,7 +150,7 @@ import org.apache.storm.scheduler.blacklist.BlacklistScheduler;
 import org.apache.storm.scheduler.multitenant.MultitenantScheduler;
 import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
 import org.apache.storm.scheduler.resource.ResourceUtils;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 import org.apache.storm.security.INimbusCredentialPlugin;
 import org.apache.storm.security.auth.AuthUtils;
 import org.apache.storm.security.auth.IAuthorizer;

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 3faf1e4..2be241a 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -27,7 +27,7 @@ import org.apache.storm.DaemonConfig;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.generated.SupervisorInfo;
-import org.apache.storm.scheduler.resource.NormalizedResources;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 1ed88c7..625fe6f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -35,9 +35,9 @@ import org.apache.storm.generated.SharedMemory;
 import org.apache.storm.generated.WorkerResources;
 import org.apache.storm.networktopography.DNSToSwitchMapping;
 import org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
-import org.apache.storm.scheduler.resource.NormalizedResources;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index 7175b7e..187a03c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -22,11 +22,10 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.storm.daemon.nimbus.TopologyResources;
 import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 
 /** An interface that provides access to the current scheduling state. */
 public interface ISchedulingState {

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
index 3d08715..242b54c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
@@ -23,7 +23,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.storm.Constants;
-import org.apache.storm.scheduler.resource.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index 8e4c1d5..41e7edf 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -35,7 +35,7 @@ import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.generated.SharedMemory;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.scheduler.resource.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
deleted file mode 100644
index 787bbce..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceOffer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.Map;
-import org.apache.storm.Constants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An offer of resources with normalized resource names.
- */
-public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceOffer.class);
-    private final NormalizedResources normalizedResources;
-    private double totalMemoryMb;
-
-    /**
-     * Create a new normalized resource offer.
-     *
-     * @param resources the resources to be normalized.
-     */
-    public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
-        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
-        totalMemoryMb = normalizedResourceMap.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
-        this.normalizedResources = new NormalizedResources(normalizedResourceMap);
-    }
-
-    public NormalizedResourceOffer() {
-        this((Map<String, ? extends Number>) null);
-    }
-
-    public NormalizedResourceOffer(NormalizedResourceOffer other) {
-        this.totalMemoryMb = other.totalMemoryMb;
-        this.normalizedResources = new NormalizedResources(other.normalizedResources);
-    }
-
-    @Override
-    public double getTotalMemoryMb() {
-        return totalMemoryMb;
-    }
-
-    public Map<String, Double> toNormalizedMap() {
-        Map<String, Double> ret = normalizedResources.toNormalizedMap();
-        ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
-        return ret;
-    }
-
-    public void add(NormalizedResourcesWithMemory other) {
-        normalizedResources.add(other.getNormalizedResources());
-        totalMemoryMb += other.getTotalMemoryMb();
-    }
-
-    public void remove(NormalizedResourcesWithMemory other) {
-        normalizedResources.remove(other.getNormalizedResources());
-        totalMemoryMb -= other.getTotalMemoryMb();
-        if (totalMemoryMb < 0.0) {
-            normalizedResources.throwBecauseResourceBecameNegative(
-                Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
-        };
-    }
-
-    /**
-     * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
-     * double, double).
-     */
-    public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
-        return normalizedResources.calculateAveragePercentageUsedBy(
-            used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
-    }
-
-    /**
-     * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
-     * double)
-     */
-    public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) {
-        return normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
-    }
-
-    /**
-     * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
-     * double).
-     */
-    public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
-        return normalizedResources.couldHoldIgnoringSharedMemory(
-            other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
-    }
-
-    public double getTotalCpu() {
-        return normalizedResources.getTotalCpu();
-    }
-
-    @Override
-    public NormalizedResources getNormalizedResources() {
-        return normalizedResources;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
deleted file mode 100644
index 2af90ab..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourceRequest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
-import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.WorkerResources;
-import org.apache.storm.utils.ObjectReader;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A resource request with normalized resource names.
- */
-public class NormalizedResourceRequest implements NormalizedResourcesWithMemory {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
-
-    private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) {
-        if (!dest.containsKey(destKey)) {
-            Number value = (Number) src.get(srcKey);
-            if (value != null) {
-                dest.put(destKey, value.doubleValue());
-            }
-        }
-    }
-
-    private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) {
-        Map<String, Double> ret = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
-            Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
-        putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
-        putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
-        putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
-        return ret;
-    }
-
-    private static Map<String, Double> parseResources(String input) {
-        Map<String, Double> topologyResources = new HashMap<>();
-        JSONParser parser = new JSONParser();
-        LOG.debug("Input to parseResources {}", input);
-        try {
-            if (input != null) {
-                Object obj = parser.parse(input);
-                JSONObject jsonObject = (JSONObject) obj;
-
-                // Legacy resource parsing
-                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
-                    Double topoMemOnHeap = ObjectReader
-                        .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
-                    topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
-                }
-                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
-                    Double topoMemOffHeap = ObjectReader
-                        .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
-                    topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
-                }
-                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
-                    Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
-                        null);
-                    topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
-                }
-
-                // If resource is also present in resources map will overwrite the above
-                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
-                    Map<String, Number> rawResourcesMap =
-                        (Map<String, Number>) jsonObject.computeIfAbsent(
-                            Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
-
-                    for (Map.Entry<String, Number> stringNumberEntry : rawResourcesMap.entrySet()) {
-                        topologyResources.put(
-                            stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
-                    }
-
-                }
-            }
-        } catch (ParseException e) {
-            LOG.error("Failed to parse component resources is:" + e.toString(), e);
-            return null;
-        }
-        return topologyResources;
-    }
-
-    private final NormalizedResources normalizedResources;
-    private double onHeap;
-    private double offHeap;
-
-    private NormalizedResourceRequest(Map<String, ? extends Number> resources,
-        Map<String, Double> defaultResources) {
-        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
-        normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
-        onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
-        offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
-        normalizedResources = new NormalizedResources(normalizedResourceMap);
-    }
-
-    public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
-        this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
-    }
-
-    public NormalizedResourceRequest(Map<String, Object> topoConf) {
-        this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
-    }
-
-    public NormalizedResourceRequest() {
-        this((Map<String, ? extends Number>) null, null);
-    }
-
-    public Map<String, Double> toNormalizedMap() {
-        Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
-        ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
-        ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
-        return ret;
-    }
-
-    public double getOnHeapMemoryMb() {
-        return onHeap;
-    }
-
-    public void addOnHeap(final double onHeap) {
-        this.onHeap += onHeap;
-    }
-
-    public double getOffHeapMemoryMb() {
-        return offHeap;
-    }
-
-    public void addOffHeap(final double offHeap) {
-        this.offHeap += offHeap;
-    }
-
-    /**
-     * Add the resources in other to this.
-     *
-     * @param other the other Request to add to this.
-     */
-    public void add(NormalizedResourceRequest other) {
-        this.normalizedResources.add(other.normalizedResources);
-        onHeap += other.onHeap;
-        offHeap += other.offHeap;
-    }
-
-    public void add(WorkerResources value) {
-        this.normalizedResources.add(value);
-        //The resources are already normalized
-        Map<String, Double> resources = value.get_resources();
-        onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
-        offHeap += resources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
-    }
-
-    @Override
-    public double getTotalMemoryMb() {
-        return onHeap + offHeap;
-    }
-
-    @Override
-    public String toString() {
-        return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
-    }
-
-    public double getTotalCpu() {
-        return this.normalizedResources.getTotalCpu();
-    }
-
-    @Override
-    public NormalizedResources getNormalizedResources() {
-        return this.normalizedResources;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
deleted file mode 100644
index f8e911a..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResources.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
-import java.util.Map;
-import org.apache.commons.lang.Validate;
-import org.apache.storm.Constants;
-import org.apache.storm.generated.WorkerResources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Resources that have been normalized. This class is intended as a delegate for more specific types of normalized resource set, since it
- * does not keep track of memory as a resource.
- */
-public class NormalizedResources {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
-
-    public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
-    private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
-
-    static {
-        resetResourceNames();
-    }
-
-    private double cpu;
-    private double[] otherResources;
-
-    /**
-     * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some
-     * algorithms sadly have different behavior if a resource exists or not.
-     */
-    @VisibleForTesting
-    public static void resetResourceNames() {
-        RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
-        RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
-    }
-
-    /**
-     * Copy constructor.
-     */
-    public NormalizedResources(NormalizedResources other) {
-        cpu = other.cpu;
-        otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
-    }
-
-    /**
-     * Create a new normalized set of resources. Note that memory is not managed by this class, as it is not consistent in requests vs
-     * offers because of how on heap vs off heap is used.
-     *
-     * @param normalizedResources the normalized resource map
-     * @param getTotalMemoryMb Supplier of total memory in MB.
-     */
-    public NormalizedResources(Map<String, Double> normalizedResources) {
-        cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
-        otherResources = RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(normalizedResources);
-    }
-
-    /**
-     * Get the total amount of cpu.
-     *
-     * @return the amount of cpu.
-     */
-    public double getTotalCpu() {
-        return cpu;
-    }
-    
-    private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
-        if (requiredLength > otherResources.length) {
-            double[] newResources = new double[requiredLength];
-            System.arraycopy(otherResources, 0, newResources, 0, otherResources.length);
-            otherResources = newResources;
-        }
-    }
-
-    private void add(double[] resourceArray) {
-        int otherLength = resourceArray.length;
-        zeroPadOtherResourcesIfNecessary(otherLength);
-        for (int i = 0; i < otherLength; i++) {
-            otherResources[i] += resourceArray[i];
-        }
-    }
-
-    public void add(NormalizedResources other) {
-        this.cpu += other.cpu;
-        add(other.otherResources);
-    }
-
-    /**
-     * Add the resources from a worker to this.
-     *
-     * @param value the worker resources that should be added to this.
-     */
-    public void add(WorkerResources value) {
-        Map<String, Double> workerNormalizedResources = value.get_resources();
-        cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
-        add(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources));
-    }
-
-    public void throwBecauseResourceBecameNegative(String resourceName, double currentValue, double subtractedValue) {
-        throw new IllegalArgumentException(String.format("Resource amounts should never be negative."
-            + " Resource '%s' with current value '%f' became negative because '%f' was removed.",
-            resourceName, currentValue, subtractedValue));
-    }
-    
-    /**
-     * Remove the other resources from this. This is the same as subtracting the resources in other from this.
-     *
-     * @param other the resources we want removed.
-     * @throws IllegalArgumentException if subtracting other from this would result in any resource amount becoming negative.
-     */
-    public void remove(NormalizedResources other) {
-        this.cpu -= other.cpu;
-        if (cpu < 0.0) {
-            throwBecauseResourceBecameNegative(Constants.COMMON_CPU_RESOURCE_NAME, cpu, other.cpu);
-        }
-        int otherLength = other.otherResources.length;
-        zeroPadOtherResourcesIfNecessary(otherLength);
-        for (int i = 0; i < otherLength; i++) {
-            otherResources[i] -= other.otherResources[i];
-            if (otherResources[i] < 0.0) {
-                throwBecauseResourceBecameNegative(getResourceNameForResourceIndex(i), otherResources[i], other.otherResources[i]);
-            }
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "Normalized resources: " + toNormalizedMap();
-    }
-
-    /**
-     * Return a Map of the normalized resource name to a double. This should only be used when returning thrift resource requests to the end
-     * user.
-     */
-    public Map<String, Double> toNormalizedMap() {
-        Map<String, Double> ret = RESOURCE_MAP_ARRAY_BRIDGE.translateFromResourceArray(otherResources);
-        ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
-        return ret;
-    }
-
-    private double getResourceAt(int index) {
-        if (index >= otherResources.length) {
-            return 0.0;
-        }
-        return otherResources[index];
-    }
-
-    /**
-     * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
-     * does not check memory because with shared memory it is beyond the scope of this.
-     *
-     * @param other the resources that we want to check if they would fit in this.
-     * @param thisTotalMemoryMb The total memory in MB of this
-     * @param otherTotalMemoryMb The total memory in MB of other
-     * @return true if it might fit, else false if it could not possibly fit.
-     */
-    public boolean couldHoldIgnoringSharedMemory(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
-        if (this.cpu < other.getTotalCpu()) {
-            return false;
-        }
-        int length = Math.max(this.otherResources.length, other.otherResources.length);
-        for (int i = 0; i < length; i++) {
-            if (getResourceAt(i) < other.getResourceAt(i)) {
-                return false;
-            }
-        }
-
-        return thisTotalMemoryMb >= otherTotalMemoryMb;
-    }
-
-    private String getResourceNameForResourceIndex(int resourceIndex) {
-        for (Map.Entry<String, Integer> entry : RESOURCE_MAP_ARRAY_BRIDGE.getResourceNamesToArrayIndex().entrySet()) {
-            int index = entry.getValue();
-            if (index == resourceIndex) {
-                return entry.getKey();
-            }
-        }
-        return null;
-    }
-
-    private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
-        throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
-            + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
-            used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
-    }
-
-    /**
-     * Calculate the average resource usage percentage with this being the total resources and used being the amounts used. Used must be a
-     * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
-     * division by 0. If all resources are skipped the result is defined to be 100.0.
-     *
-     * @param used the amount of resources used.
-     * @param totalMemoryMb The total memory in MB
-     * @param usedMemoryMb The used memory in MB
-     * @return the average percentage used 0.0 to 100.0.
-     * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
-     *     resources that are not present in the total.
-     */
-    public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
-                + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
-                toNormalizedMap(), used.toNormalizedMap());
-        }
-
-        int skippedResourceTypes = 0;
-        double total = 0.0;
-        if (usedMemoryMb > totalMemoryMb) {
-            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-        }
-        if (totalMemoryMb != 0.0) {
-            total += usedMemoryMb / totalMemoryMb;
-        } else {
-            skippedResourceTypes++;
-        }
-        double totalCpu = getTotalCpu();
-        if (used.getTotalCpu() > getTotalCpu()) {
-            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-        }
-        if (totalCpu != 0.0) {
-            total += used.getTotalCpu() / getTotalCpu();
-        } else {
-            skippedResourceTypes++;
-        }
-
-        if (used.otherResources.length > otherResources.length) {
-            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-        }
-
-        for (int i = 0; i < otherResources.length; i++) {
-            double totalValue = otherResources[i];
-            double usedValue;
-            if (i >= used.otherResources.length) {
-                //Resources missing from used are using none of that resource
-                usedValue = 0.0;
-            } else {
-                usedValue = used.otherResources[i];
-            }
-            if (usedValue > totalValue) {
-                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-            }
-            if (totalValue == 0.0) {
-                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
-                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
-                skippedResourceTypes++;
-                continue;
-            }
-
-            total += usedValue / totalValue;
-        }
-        //Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
-        int divisor = 2 + otherResources.length - skippedResourceTypes;
-        if (divisor == 0) {
-            /*
-             * This is an arbitrary choice to make the result consistent with calculateMin. Any value would be valid here, becase there are
-             * no (non-zero) resources in the total set of resources, so we're trying to average 0 values.
-             */
-            return 100.0;
-        } else {
-            return (total * 100.0) / divisor;
-        }
-    }
-
-    /**
-     * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used. Used must be a
-     * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
-     * division by 0. If all resources are skipped the result is defined to be 100.0.
-     *
-     * @param used the amount of resources used.
-     * @param totalMemoryMb The total memory in MB
-     * @param usedMemoryMb The used memory in MB
-     * @return the minimum percentage used 0.0 to 100.0.
-     * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
-     *     resources that are not present in the total.
-     */
-    public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}"
-                + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
-                toNormalizedMap(), used.toNormalizedMap());
-        }
-
-        double min = 1.0;
-        if (usedMemoryMb > totalMemoryMb) {
-            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-        }
-        if (totalMemoryMb != 0.0) {
-            min = Math.min(min, usedMemoryMb / totalMemoryMb);
-        }
-        double totalCpu = getTotalCpu();
-        if (used.getTotalCpu() > totalCpu) {
-            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-        }
-        if (totalCpu != 0.0) {
-            min = Math.min(min, used.getTotalCpu() / totalCpu);
-        }
-
-        if (used.otherResources.length > otherResources.length) {
-            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-        }
-        
-        for (int i = 0; i < otherResources.length; i++) {
-            if (otherResources[i] == 0.0) {
-                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
-                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
-                continue;
-            }
-            if (i >= used.otherResources.length) {
-                //Resources missing from used are using none of that resource
-                return 0;
-            }
-            if (used.otherResources[i] > otherResources[i]) {
-                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
-            }
-            min = Math.min(min, used.otherResources[i] / otherResources[i]);
-        }
-        return min * 100.0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
deleted file mode 100644
index 640233a..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/NormalizedResourcesWithMemory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-/**
- * Intended for {@link NormalizedResources} wrappers that handle memory.
- */
-public interface NormalizedResourcesWithMemory {
-
-    NormalizedResources getNormalizedResources();
-
-    double getTotalMemoryMb();
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index db9ca73..84a95a7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -30,6 +30,7 @@ import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
deleted file mode 100644
index cf4f80b..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceMapArrayBridge.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.storm.Constants;
-
-/**
- * Provides translation between normalized resource maps and resource value arrays. Some operations use resource value arrays instead of the
- * full normalized resource map as an optimization. See {@link NormalizedResources}.
- */
-public class ResourceMapArrayBridge {
-
-    private final ConcurrentMap<String, Integer> resourceNamesToArrayIndex = new ConcurrentHashMap<>();
-    private final AtomicInteger counter = new AtomicInteger(0);
-
-    /**
-     * Translates a normalized resource map to an array of resource values. Each resource name will be assigned an index in the array, which
-     * is guaranteed to be consistent with subsequent invocations of this method. Note that CPU and memory resources are not translated by
-     * this method, as they are expected to be captured elsewhere.
-     *
-     * @param normalizedResources The resources to translate to an array
-     * @return The array of resource values
-     */
-    public double[] translateToResourceArray(Map<String, Double> normalizedResources) {
-        //To avoid locking we will go through the map twice.  It should be small so it is probably not a big deal
-        for (String key : normalizedResources.keySet()) {
-            //We are going to skip over CPU and Memory, because they are captured elsewhere
-            if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
-                && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
-                && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
-                && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
-                resourceNamesToArrayIndex.computeIfAbsent(key, (k) -> counter.getAndIncrement());
-            }
-        }
-        //By default all of the values are 0
-        double[] ret = new double[counter.get()];
-        for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
-            Integer index = resourceNamesToArrayIndex.get(entry.getKey());
-            if (index != null) {
-                //index == null if it is memory or CPU
-                ret[index] = entry.getValue();
-            }
-        }
-        return ret;
-    }
-
-    /**
-     * Translates an array of resource values to a normalized resource map.
-     *
-     * @param resources The resource array to translate
-     * @return The normalized resource map
-     */
-    public Map<String, Double> translateFromResourceArray(double[] resources) {
-        Map<String, Double> ret = new HashMap<>();
-        int length = resources.length;
-        for (Map.Entry<String, Integer> entry : resourceNamesToArrayIndex.entrySet()) {
-            int index = entry.getValue();
-            if (index < length) {
-                ret.put(entry.getKey(), resources[index]);
-            }
-        }
-        return ret;
-    }
-
-    public Map<String, Integer> getResourceNamesToArrayIndex() {
-        return Collections.unmodifiableMap(resourceNamesToArrayIndex);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
deleted file mode 100644
index d59ba92..0000000
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceNameNormalizer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2018 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
-
-/**
- * Provides resource name normalization for resource maps.
- */
-public class ResourceNameNormalizer {
-
-    private final Map<String, String> resourceNameMapping;
-
-    public ResourceNameNormalizer() {
-        Map<String, String> tmp = new HashMap<>();
-        tmp.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, Constants.COMMON_CPU_RESOURCE_NAME);
-        tmp.put(Config.SUPERVISOR_CPU_CAPACITY, Constants.COMMON_CPU_RESOURCE_NAME);
-        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME);
-        tmp.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME);
-        tmp.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME);
-        resourceNameMapping = Collections.unmodifiableMap(tmp);
-    }
-
-    /**
-     * Normalizes a supervisor resource map or topology details map's keys to universal resource names.
-     *
-     * @param resourceMap resource map of either Supervisor or Topology
-     * @return the resource map with common resource names
-     */
-    public Map<String, Double> normalizedResourceMap(Map<String, ? extends Number> resourceMap) {
-        if (resourceMap == null) {
-            return new HashMap<>();
-        }
-        return new HashMap<>(resourceMap.entrySet().stream()
-            .collect(Collectors.toMap(
-                //Map the key if needed
-                (e) -> resourceNameMapping.getOrDefault(e.getKey(), e.getKey()),
-                //Map the value
-                (e) -> e.getValue().doubleValue())));
-    }
-
-    public Map<String, String> getResourceNameMapping() {
-        return resourceNameMapping;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index ca4ea63..2f9ab4c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -25,6 +25,8 @@ import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
new file mode 100644
index 0000000..417dca9
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.Map;
+import org.apache.storm.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An offer of resources with normalized resource names.
+ */
+public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceOffer.class);
+    private final NormalizedResources normalizedResources;
+    private double totalMemoryMb;
+
+    /**
+     * Create a new normalized resource offer.
+     *
+     * @param resources the resources to be normalized.
+     */
+    public NormalizedResourceOffer(Map<String, ? extends Number> resources) {
+        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources);
+        totalMemoryMb = normalizedResourceMap.getOrDefault(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, 0.0);
+        this.normalizedResources = new NormalizedResources(normalizedResourceMap);
+    }
+
+    public NormalizedResourceOffer() {
+        this((Map<String, ? extends Number>) null);
+    }
+
+    public NormalizedResourceOffer(NormalizedResourceOffer other) {
+        this.totalMemoryMb = other.totalMemoryMb;
+        this.normalizedResources = new NormalizedResources(other.normalizedResources);
+    }
+
+    @Override
+    public double getTotalMemoryMb() {
+        return totalMemoryMb;
+    }
+
+    public Map<String, Double> toNormalizedMap() {
+        Map<String, Double> ret = normalizedResources.toNormalizedMap();
+        ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
+        return ret;
+    }
+
+    public void add(NormalizedResourcesWithMemory other) {
+        normalizedResources.add(other.getNormalizedResources());
+        totalMemoryMb += other.getTotalMemoryMb();
+    }
+
+    public void remove(NormalizedResourcesWithMemory other) {
+        normalizedResources.remove(other.getNormalizedResources());
+        totalMemoryMb -= other.getTotalMemoryMb();
+        if (totalMemoryMb < 0.0) {
+            normalizedResources.throwBecauseResourceBecameNegative(
+                Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
+        };
+    }
+
+    /**
+     * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
+     * double, double).
+     */
+    public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
+        return normalizedResources.calculateAveragePercentageUsedBy(
+            used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+    }
+
+    /**
+     * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+     * double)
+     */
+    public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) {
+        return normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), getTotalMemoryMb(), used.getTotalMemoryMb());
+    }
+
+    /**
+     * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
+     * double).
+     */
+    public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
+        return normalizedResources.couldHoldIgnoringSharedMemory(
+            other.getNormalizedResources(), getTotalMemoryMb(), other.getTotalMemoryMb());
+    }
+
+    public double getTotalCpu() {
+        return normalizedResources.getTotalCpu();
+    }
+
+    @Override
+    public NormalizedResources getNormalizedResources() {
+        return normalizedResources;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
new file mode 100644
index 0000000..6627bb5
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ObjectReader;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A resource request with normalized resource names.
+ */
+public class NormalizedResourceRequest implements NormalizedResourcesWithMemory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NormalizedResourceRequest.class);
+
+    private static void putIfMissing(Map<String, Double> dest, String destKey, Map<String, Object> src, String srcKey) {
+        if (!dest.containsKey(destKey)) {
+            Number value = (Number) src.get(srcKey);
+            if (value != null) {
+                dest.put(destKey, value.doubleValue());
+            }
+        }
+    }
+
+    private static Map<String, Double> getDefaultResources(Map<String, Object> topoConf) {
+        Map<String, Double> ret = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, Number>) topoConf.getOrDefault(
+            Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
+        putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+        putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
+        putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+        return ret;
+    }
+
+    private static Map<String, Double> parseResources(String input) {
+        Map<String, Double> topologyResources = new HashMap<>();
+        JSONParser parser = new JSONParser();
+        LOG.debug("Input to parseResources {}", input);
+        try {
+            if (input != null) {
+                Object obj = parser.parse(input);
+                JSONObject jsonObject = (JSONObject) obj;
+
+                // Legacy resource parsing
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
+                    Double topoMemOnHeap = ObjectReader
+                        .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+                    topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
+                }
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
+                    Double topoMemOffHeap = ObjectReader
+                        .getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+                    topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
+                }
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
+                    Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
+                        null);
+                    topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
+                }
+
+                // If resource is also present in resources map will overwrite the above
+                if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP)) {
+                    Map<String, Number> rawResourcesMap =
+                        (Map<String, Number>) jsonObject.computeIfAbsent(
+                            Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, (k) -> new HashMap<>());
+
+                    for (Map.Entry<String, Number> stringNumberEntry : rawResourcesMap.entrySet()) {
+                        topologyResources.put(
+                            stringNumberEntry.getKey(), stringNumberEntry.getValue().doubleValue());
+                    }
+
+                }
+            }
+        } catch (ParseException e) {
+            LOG.error("Failed to parse component resources is:" + e.toString(), e);
+            return null;
+        }
+        return topologyResources;
+    }
+
+    private final NormalizedResources normalizedResources;
+    private double onHeap;
+    private double offHeap;
+
+    private NormalizedResourceRequest(Map<String, ? extends Number> resources,
+        Map<String, Double> defaultResources) {
+        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
+        normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
+        onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+        offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+        normalizedResources = new NormalizedResources(normalizedResourceMap);
+    }
+
+    public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
+        this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
+    }
+
+    public NormalizedResourceRequest(Map<String, Object> topoConf) {
+        this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
+    }
+
+    public NormalizedResourceRequest() {
+        this((Map<String, ? extends Number>) null, null);
+    }
+
+    public Map<String, Double> toNormalizedMap() {
+        Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
+        ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
+        ret.put(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, onHeap);
+        return ret;
+    }
+
+    public double getOnHeapMemoryMb() {
+        return onHeap;
+    }
+
+    public void addOnHeap(final double onHeap) {
+        this.onHeap += onHeap;
+    }
+
+    public double getOffHeapMemoryMb() {
+        return offHeap;
+    }
+
+    public void addOffHeap(final double offHeap) {
+        this.offHeap += offHeap;
+    }
+
+    /**
+     * Add the resources in other to this.
+     *
+     * @param other the other Request to add to this.
+     */
+    public void add(NormalizedResourceRequest other) {
+        this.normalizedResources.add(other.normalizedResources);
+        onHeap += other.onHeap;
+        offHeap += other.offHeap;
+    }
+
+    public void add(WorkerResources value) {
+        this.normalizedResources.add(value);
+        //The resources are already normalized
+        Map<String, Double> resources = value.get_resources();
+        onHeap += resources.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+        offHeap += resources.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+    }
+
+    @Override
+    public double getTotalMemoryMb() {
+        return onHeap + offHeap;
+    }
+
+    @Override
+    public String toString() {
+        return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
+    }
+
+    public double getTotalCpu() {
+        return this.normalizedResources.getTotalCpu();
+    }
+
+    @Override
+    public NormalizedResources getNormalizedResources() {
+        return this.normalizedResources;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
new file mode 100644
index 0000000..76d5ce2
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.storm.Constants;
+import org.apache.storm.generated.WorkerResources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resources that have been normalized. This class is intended as a delegate for more specific types of normalized resource set, since it
+ * does not keep track of memory as a resource.
+ */
+public class NormalizedResources {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NormalizedResources.class);
+
+    public static ResourceNameNormalizer RESOURCE_NAME_NORMALIZER;
+    private static ResourceMapArrayBridge RESOURCE_MAP_ARRAY_BRIDGE;
+
+    static {
+        resetResourceNames();
+    }
+
+    private double cpu;
+    private double[] otherResources;
+
+    /**
+     * This is for testing only. It allows a test to reset the static state relating to resource names. We reset the mapping because some
+     * algorithms sadly have different behavior if a resource exists or not.
+     */
+    @VisibleForTesting
+    public static void resetResourceNames() {
+        RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
+        RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
+    }
+
+    /**
+     * Copy constructor.
+     */
+    public NormalizedResources(NormalizedResources other) {
+        cpu = other.cpu;
+        otherResources = Arrays.copyOf(other.otherResources, other.otherResources.length);
+    }
+
+    /**
+     * Create a new normalized set of resources. Note that memory is not managed by this class, as it is not consistent in requests vs
+     * offers because of how on heap vs off heap is used.
+     *
+     * @param normalizedResources the normalized resource map
+     */
+    public NormalizedResources(Map<String, Double> normalizedResources) {
+        cpu = normalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+        otherResources = RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(normalizedResources);
+    }
+
+    /**
+     * Get the total amount of cpu.
+     *
+     * @return the amount of cpu.
+     */
+    public double getTotalCpu() {
+        return cpu;
+    }
+    
+    private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
+        if (requiredLength > otherResources.length) {
+            double[] newResources = new double[requiredLength];
+            System.arraycopy(otherResources, 0, newResources, 0, otherResources.length);
+            otherResources = newResources;
+        }
+    }
+
+    private void add(double[] resourceArray) {
+        int otherLength = resourceArray.length;
+        zeroPadOtherResourcesIfNecessary(otherLength);
+        for (int i = 0; i < otherLength; i++) {
+            otherResources[i] += resourceArray[i];
+        }
+    }
+
+    public void add(NormalizedResources other) {
+        this.cpu += other.cpu;
+        add(other.otherResources);
+    }
+
+    /**
+     * Add the resources from a worker to this.
+     *
+     * @param value the worker resources that should be added to this.
+     */
+    public void add(WorkerResources value) {
+        Map<String, Double> workerNormalizedResources = value.get_resources();
+        cpu += workerNormalizedResources.getOrDefault(Constants.COMMON_CPU_RESOURCE_NAME, 0.0);
+        add(RESOURCE_MAP_ARRAY_BRIDGE.translateToResourceArray(workerNormalizedResources));
+    }
+
+    /**
+     * Throw an IllegalArgumentException because a resource became negative during remove.
+     * @param resourceName The name of the resource that became negative
+     * @param currentValue The current value of the resource
+     * @param subtractedValue The value that was subtracted to make the resource negative
+     */
+    public void throwBecauseResourceBecameNegative(String resourceName, double currentValue, double subtractedValue) {
+        throw new IllegalArgumentException(String.format("Resource amounts should never be negative."
+            + " Resource '%s' with current value '%f' became negative because '%f' was removed.",
+            resourceName, currentValue, subtractedValue));
+    }
+    
+    /**
+     * Remove the other resources from this. This is the same as subtracting the resources in other from this.
+     *
+     * @param other the resources we want removed.
+     * @throws IllegalArgumentException if subtracting other from this would result in any resource amount becoming negative.
+     */
+    public void remove(NormalizedResources other) {
+        this.cpu -= other.cpu;
+        if (cpu < 0.0) {
+            throwBecauseResourceBecameNegative(Constants.COMMON_CPU_RESOURCE_NAME, cpu, other.cpu);
+        }
+        int otherLength = other.otherResources.length;
+        zeroPadOtherResourcesIfNecessary(otherLength);
+        for (int i = 0; i < otherLength; i++) {
+            otherResources[i] -= other.otherResources[i];
+            if (otherResources[i] < 0.0) {
+                throwBecauseResourceBecameNegative(getResourceNameForResourceIndex(i), otherResources[i], other.otherResources[i]);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "Normalized resources: " + toNormalizedMap();
+    }
+
+    /**
+     * Return a Map of the normalized resource name to a double. This should only be used when returning thrift resource requests to the end
+     * user.
+     */
+    public Map<String, Double> toNormalizedMap() {
+        Map<String, Double> ret = RESOURCE_MAP_ARRAY_BRIDGE.translateFromResourceArray(otherResources);
+        ret.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+        return ret;
+    }
+
+    private double getResourceAt(int index) {
+        if (index >= otherResources.length) {
+            return 0.0;
+        }
+        return otherResources[index];
+    }
+
+    /**
+     * A simple sanity check to see if all of the resources in this would be large enough to hold the resources in other ignoring memory. It
+     * does not check memory because with shared memory it is beyond the scope of this.
+     *
+     * @param other the resources that we want to check if they would fit in this.
+     * @param thisTotalMemoryMb The total memory in MB of this
+     * @param otherTotalMemoryMb The total memory in MB of other
+     * @return true if it might fit, else false if it could not possibly fit.
+     */
+    public boolean couldHoldIgnoringSharedMemory(NormalizedResources other, double thisTotalMemoryMb, double otherTotalMemoryMb) {
+        if (this.cpu < other.getTotalCpu()) {
+            return false;
+        }
+        int length = Math.max(this.otherResources.length, other.otherResources.length);
+        for (int i = 0; i < length; i++) {
+            if (getResourceAt(i) < other.getResourceAt(i)) {
+                return false;
+            }
+        }
+
+        return thisTotalMemoryMb >= otherTotalMemoryMb;
+    }
+
+    private String getResourceNameForResourceIndex(int resourceIndex) {
+        for (Map.Entry<String, Integer> entry : RESOURCE_MAP_ARRAY_BRIDGE.getResourceNamesToArrayIndex().entrySet()) {
+            int index = entry.getValue();
+            if (index == resourceIndex) {
+                return entry.getKey();
+            }
+        }
+        return null;
+    }
+
+    private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+        throw new IllegalArgumentException(String.format("The used resources must be a subset of the total resources."
+            + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
+            used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
+    }
+
+    /**
+     * Calculate the average resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+     * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+     * division by 0. If all resources are skipped the result is defined to be 100.0.
+     *
+     * @param used the amount of resources used.
+     * @param totalMemoryMb The total memory in MB
+     * @param usedMemoryMb The used memory in MB
+     * @return the average percentage used 0.0 to 100.0.
+     * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+     *     resources that are not present in the total.
+     */
+    public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
+                + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+                toNormalizedMap(), used.toNormalizedMap());
+        }
+
+        int skippedResourceTypes = 0;
+        double total = 0.0;
+        if (usedMemoryMb > totalMemoryMb) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        if (totalMemoryMb != 0.0) {
+            total += usedMemoryMb / totalMemoryMb;
+        } else {
+            skippedResourceTypes++;
+        }
+        double totalCpu = getTotalCpu();
+        if (used.getTotalCpu() > getTotalCpu()) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        if (totalCpu != 0.0) {
+            total += used.getTotalCpu() / getTotalCpu();
+        } else {
+            skippedResourceTypes++;
+        }
+
+        if (used.otherResources.length > otherResources.length) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+
+        for (int i = 0; i < otherResources.length; i++) {
+            double totalValue = otherResources[i];
+            double usedValue;
+            if (i >= used.otherResources.length) {
+                //Resources missing from used are using none of that resource
+                usedValue = 0.0;
+            } else {
+                usedValue = used.otherResources[i];
+            }
+            if (usedValue > totalValue) {
+                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+            }
+            if (totalValue == 0.0) {
+                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+                skippedResourceTypes++;
+                continue;
+            }
+
+            total += usedValue / totalValue;
+        }
+        //Adjust the divisor for the average to account for any skipped resources (those where the total was 0)
+        int divisor = 2 + otherResources.length - skippedResourceTypes;
+        if (divisor == 0) {
+            /*
+             * This is an arbitrary choice to make the result consistent with calculateMin. Any value would be valid here, becase there are
+             * no (non-zero) resources in the total set of resources, so we're trying to average 0 values.
+             */
+            return 100.0;
+        } else {
+            return (total * 100.0) / divisor;
+        }
+    }
+
+    /**
+     * Calculate the minimum resource usage percentage with this being the total resources and used being the amounts used. Used must be a
+     * subset of the total resources. If a resource in the total has a value of zero, it will be skipped in the calculation to avoid
+     * division by 0. If all resources are skipped the result is defined to be 100.0.
+     *
+     * @param used the amount of resources used.
+     * @param totalMemoryMb The total memory in MB
+     * @param usedMemoryMb The used memory in MB
+     * @return the minimum percentage used 0.0 to 100.0.
+     * @throws IllegalArgumentException if any resource in used has a greater value than the same resource in the total, or used has generic
+     *     resources that are not present in the total.
+     */
+    public double calculateMinPercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Calculating min percentage used by. Used Mem: {} Total Mem: {}"
+                + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
+                toNormalizedMap(), used.toNormalizedMap());
+        }
+
+        double min = 1.0;
+        if (usedMemoryMb > totalMemoryMb) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        if (totalMemoryMb != 0.0) {
+            min = Math.min(min, usedMemoryMb / totalMemoryMb);
+        }
+        double totalCpu = getTotalCpu();
+        if (used.getTotalCpu() > totalCpu) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        if (totalCpu != 0.0) {
+            min = Math.min(min, used.getTotalCpu() / totalCpu);
+        }
+
+        if (used.otherResources.length > otherResources.length) {
+            throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+        }
+        
+        for (int i = 0; i < otherResources.length; i++) {
+            if (otherResources[i] == 0.0) {
+                //Skip any resources where the total is 0, the percent used for this resource isn't meaningful.
+                //We fall back to prioritizing by cpu, memory and any other resources by ignoring this value
+                continue;
+            }
+            if (i >= used.otherResources.length) {
+                //Resources missing from used are using none of that resource
+                return 0;
+            }
+            if (used.otherResources[i] > otherResources[i]) {
+                throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, usedMemoryMb);
+            }
+            min = Math.min(min, used.otherResources[i] / otherResources[i]);
+        }
+        return min * 100.0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
new file mode 100644
index 0000000..5001645
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+/**
+ * Intended for {@link NormalizedResources} wrappers that handle memory.
+ */
+public interface NormalizedResourcesWithMemory {
+
+    NormalizedResources getNormalizedResources();
+
+    double getTotalMemoryMb();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/88533346/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
new file mode 100644
index 0000000..2b07c65
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.normalization;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.storm.Constants;
+
+/**
+ * Provides translation between normalized resource maps and resource value arrays. Some operations use resource value arrays instead of the
+ * full normalized resource map as an optimization. See {@link NormalizedResources}.
+ */
+public class ResourceMapArrayBridge {
+
+    private final ConcurrentMap<String, Integer> resourceNamesToArrayIndex = new ConcurrentHashMap<>();
+    private final AtomicInteger counter = new AtomicInteger(0);
+
+    /**
+     * Translates a normalized resource map to an array of resource values. Each resource name will be assigned an index in the array, which
+     * is guaranteed to be consistent with subsequent invocations of this method. Note that CPU and memory resources are not translated by
+     * this method, as they are expected to be captured elsewhere.
+     *
+     * @param normalizedResources The resources to translate to an array
+     * @return The array of resource values
+     */
+    public double[] translateToResourceArray(Map<String, Double> normalizedResources) {
+        //To avoid locking we will go through the map twice.  It should be small so it is probably not a big deal
+        for (String key : normalizedResources.keySet()) {
+            //We are going to skip over CPU and Memory, because they are captured elsewhere
+            if (!Constants.COMMON_CPU_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME.equals(key)
+                && !Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME.equals(key)) {
+                resourceNamesToArrayIndex.computeIfAbsent(key, (k) -> counter.getAndIncrement());
+            }
+        }
+        //By default all of the values are 0
+        double[] ret = new double[counter.get()];
+        for (Map.Entry<String, Double> entry : normalizedResources.entrySet()) {
+            Integer index = resourceNamesToArrayIndex.get(entry.getKey());
+            if (index != null) {
+                //index == null if it is memory or CPU
+                ret[index] = entry.getValue();
+            }
+        }
+        return ret;
+    }
+
+    /**
+     * Translates an array of resource values to a normalized resource map.
+     *
+     * @param resources The resource array to translate
+     * @return The normalized resource map
+     */
+    public Map<String, Double> translateFromResourceArray(double[] resources) {
+        Map<String, Double> ret = new HashMap<>();
+        int length = resources.length;
+        for (Map.Entry<String, Integer> entry : resourceNamesToArrayIndex.entrySet()) {
+            int index = entry.getValue();
+            if (index < length) {
+                ret.put(entry.getKey(), resources[index]);
+            }
+        }
+        return ret;
+    }
+
+    public Map<String, Integer> getResourceNamesToArrayIndex() {
+        return Collections.unmodifiableMap(resourceNamesToArrayIndex);
+    }
+
+}