You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/05/14 17:36:39 UTC

[1/5] storm git commit: STORM-3040: Improve scheduler performance

Repository: storm
Updated Branches:
  refs/heads/master 5cb4582ff -> 53f38bc31


http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 222c0cd..366ee78 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -12,6 +12,8 @@
 
 package org.apache.storm.scheduler.resource;
 
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
@@ -112,6 +114,7 @@ public class TestUtilsForResourceAwareScheduler {
                                              Map<String, Map<String, Number>> pools) {
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
+        config.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, GenSupervisorsDnsToSwitchMapping.class.getName());
         config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, DefaultSchedulingPriorityStrategy.class.getName());
         config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
         config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, compPcore);
@@ -139,19 +142,65 @@ public class TestUtilsForResourceAwareScheduler {
 
     public static Map<String, SupervisorDetails> genSupervisors(int numSup, int numPorts, int start,
                                                                 double cpu, double mem, Map<String, Double> miscResources) {
+        return genSupervisorsWithRacks(1, numSup, numPorts, 0, start, cpu, mem, miscResources);
+
+    }
+
+    private static final Pattern HOST_NAME_PATTERN = Pattern.compile("^(host-\\d+)-(.+)$");
+
+    public static String hostNameToRackName(String hostName) {
+        Matcher m = HOST_NAME_PATTERN.matcher(hostName);
+        if (m.matches()) {
+            return m.group(2);
+        }
+        return DNSToSwitchMapping.DEFAULT_RACK;
+    }
+
+    private static final Pattern SUPERVISOR_ID_PATTERN = Pattern.compile("^(r\\d+)s(\\d+)$");
+
+    public static String supervisorIdToRackName(String hostName) {
+        Matcher m = SUPERVISOR_ID_PATTERN.matcher(hostName);
+        if (m.matches()) {
+            return m.group(1);
+        }
+        return DNSToSwitchMapping.DEFAULT_RACK;
+    }
+
+    public static class GenSupervisorsDnsToSwitchMapping implements DNSToSwitchMapping {
+
+        private Map<String, String> mappingCache = new ConcurrentHashMap<>();
+
+        @Override
+        public Map<String,String> resolve(List<String> names) {
+
+            Map<String, String> m = new HashMap<>();
+            for (String name : names) {
+                m.put(name, mappingCache.computeIfAbsent(name, TestUtilsForResourceAwareScheduler::hostNameToRackName));
+            }
+            return m;
+        }
+    }
+
+    public static Map<String, SupervisorDetails> genSupervisorsWithRacks(int numRacks, int numSupersPerRack, int numPorts, int rackStart,
+                                                                         int superInRackStart, double cpu, double mem,
+                                                                         Map<String, Double> miscResources) {
         Map<String, Double> resourceMap = new HashMap<>();
         resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
         resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
         resourceMap.putAll(miscResources);
-        Map<String, SupervisorDetails> retList = new HashMap<String, SupervisorDetails>();
-        for (int i = start; i < numSup + start; i++) {
-            List<Number> ports = new LinkedList<>();
-            for (int j = 0; j < numPorts; j++) {
-                ports.add(j);
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        for (int rack = rackStart; rack < numRacks + rackStart; rack++) {
+            for (int superInRack = superInRackStart; superInRack < (numSupersPerRack + superInRackStart); superInRack++) {
+                List<Number> ports = new LinkedList<>();
+                for (int p = 0; p < numPorts; p++) {
+                    ports.add(p);
+                }
+                SupervisorDetails sup = new SupervisorDetails(String.format("r%03ds%03d", rack, superInRack),
+                    String.format("host-%03d-rack-%03d", superInRack, rack), null, ports,
+                    NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
+                retList.put(sup.getId(), sup);
+
             }
-            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports,
-                                                          NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap));
-            retList.put(sup.getId(), sup);
         }
         return retList;
     }
@@ -201,21 +250,38 @@ public class TestUtilsForResourceAwareScheduler {
                                               int spoutParallelism, int boltParallelism, int launchTime, int priority,
                                               String user) {
 
+        return genTopology(name, config, numSpout, numBolt, spoutParallelism, boltParallelism, launchTime, priority, user,
+            Double.MAX_VALUE);
+    }
+
+    public static TopologyDetails genTopology(String name, Map<String, Object> config, int numSpout, int numBolt,
+                                              int spoutParallelism, int boltParallelism, int launchTime, int priority,
+                                              String user, double maxHeapSize) {
+        StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
+        return topoToTopologyDetails(name, config, topology, launchTime, priority, user, maxHeapSize);
+    }
+
+    public static TopologyDetails topoToTopologyDetails(String name, Map<String, Object> config, StormTopology topology,
+                                                        int launchTime, int priority, String user, double maxHeapSize) {
+
         Config conf = new Config();
         conf.putAll(config);
         conf.put(Config.TOPOLOGY_PRIORITY, priority);
         conf.put(Config.TOPOLOGY_NAME, name);
         conf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
-        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
-        StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
+        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, maxHeapSize);
         TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
-                                                   0,
-                                                   genExecsAndComps(topology), launchTime, user);
+            0, genExecsAndComps(topology), launchTime, user);
         return topo;
     }
 
     public static StormTopology buildTopology(int numSpout, int numBolt,
                                               int spoutParallelism, int boltParallelism) {
+        return topologyBuilder(numSpout, numBolt, spoutParallelism, boltParallelism).createTopology();
+    }
+
+    public static TopologyBuilder topologyBuilder(int numSpout, int numBolt,
+                                                  int spoutParallelism, int boltParallelism) {
         LOG.debug("buildTopology with -> numSpout: " + numSpout + " spoutParallelism: "
                   + spoutParallelism + " numBolt: "
                   + numBolt + " boltParallelism: " + boltParallelism);
@@ -235,7 +301,7 @@ public class TestUtilsForResourceAwareScheduler {
             j++;
         }
 
-        return builder.createTopology();
+        return builder;
     }
 
     public static class TestSpout extends BaseRichSpout {

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index f82d0f7..607b815 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -319,7 +319,7 @@ public class TestDefaultResourceAwareStrategy {
         List<String> nodeHostnames = rackToNodes.get("rack-1");
         for (int i = 0; i< topo2.getExecutors().size()/2; i++) {
             String nodeHostname = nodeHostnames.get(i % nodeHostnames.size());
-            RAS_Node node = rs.idToNode(rs.nodeHostnameToId(nodeHostname));
+            RAS_Node node = rs.hostnameToNodes(nodeHostname).get(0);
             WorkerSlot targetSlot = node.getFreeSlots().iterator().next();
             ExecutorDetails targetExec = executorIterator.next();
             // to keep track of free slots
@@ -444,7 +444,7 @@ public class TestDefaultResourceAwareStrategy {
         List<String> nodeHostnames = rackToNodes.get("rack-1");
         for (int i = 0; i< topo2.getExecutors().size()/2; i++) {
             String nodeHostname = nodeHostnames.get(i % nodeHostnames.size());
-            RAS_Node node = rs.idToNode(rs.nodeHostnameToId(nodeHostname));
+            RAS_Node node = rs.hostnameToNodes(nodeHostname).get(0);
             WorkerSlot targetSlot = node.getFreeSlots().iterator().next();
             ExecutorDetails targetExec = executorIterator.next();
             // to keep track of free slots

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
index f7a2a84..2c41237 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
@@ -26,6 +26,8 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.WorkerResources;
@@ -33,7 +35,6 @@ import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.INimbus;
 import org.apache.storm.scheduler.SchedulerAssignment;
-import org.apache.storm.scheduler.SchedulerAssignmentImpl;
 import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.SupervisorResources;
 import org.apache.storm.scheduler.Topologies;
@@ -44,14 +45,12 @@ import org.apache.storm.topology.SharedOffHeapWithinNode;
 import org.apache.storm.topology.SharedOffHeapWithinWorker;
 import org.apache.storm.topology.SharedOnHeap;
 import org.apache.storm.topology.TopologyBuilder;
-import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 public class TestGenericResourceAwareStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(TestGenericResourceAwareStrategy.class);
@@ -220,6 +219,66 @@ public class TestGenericResourceAwareStrategy {
             foundScheduling.add(new HashSet<>(execs));
         }
 
-        Assert.assertEquals(expectedScheduling, foundScheduling);
+        assertEquals(expectedScheduling, foundScheduling);
+    }
+
+    @Test
+    public void testAntiAffinityWithMultipleTopologies() {
+        INimbus iNimbus = new INimbusTest();
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithRacks(1, 40, 66, 0, 0, 4700, 226200, new HashMap<>());
+        HashMap<String, Double> extraResources = new HashMap<>();
+        extraResources.put("my.gpu", 1.0);
+        supMap.putAll(genSupervisorsWithRacks(1, 40, 66, 1, 0, 4700, 226200, extraResources));
+
+        Config config = new Config();
+        config.putAll(createGrasClusterConfig(88, 775, 25, null, null));
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+        rs.prepare(config);
+
+        TopologyDetails tdSimple = genTopology("topology-simple", config, 1,
+            5, 100, 300, 0, 0, "user", 8192);
+
+        //Schedule the simple topology first
+        Topologies topologies = new Topologies(tdSimple);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, config);
+        rs.schedule(topologies, cluster);
+
+        TopologyBuilder builder = topologyBuilder(1, 5, 100, 300);
+        builder.setBolt("gpu-bolt", new TestBolt(), 40)
+            .addResource("my.gpu", 1.0)
+            .shuffleGrouping("spout-0");
+        TopologyDetails tdGpu = topoToTopologyDetails("topology-gpu", config, builder.createTopology(), 0, 0,"user", 8192);
+
+        //Now schedule GPU but with the simple topology in place.
+        topologies = new Topologies(tdSimple, tdGpu);
+        cluster = new Cluster(cluster, topologies);
+        rs.schedule(topologies, cluster);
+
+        Map<String, SchedulerAssignment> assignments = new TreeMap<>(cluster.getAssignments());
+        assertEquals(2, assignments.size());
+
+        Map<String, Map<String, AtomicLong>> topoPerRackCount = new HashMap<>();
+        for (Entry<String, SchedulerAssignment> entry: assignments.entrySet()) {
+            SchedulerAssignment sa = entry.getValue();
+            Map<String, AtomicLong> slotsPerRack = new TreeMap<>();
+            for (WorkerSlot slot : sa.getSlots()) {
+                String nodeId = slot.getNodeId();
+                String rack = supervisorIdToRackName(nodeId);
+                slotsPerRack.computeIfAbsent(rack, (r) -> new AtomicLong(0)).incrementAndGet();
+            }
+            LOG.info("{} => {}", entry.getKey(), slotsPerRack);
+            topoPerRackCount.put(entry.getKey(), slotsPerRack);
+        }
+
+        Map<String, AtomicLong> simpleCount = topoPerRackCount.get("topology-simple-0");
+        assertNotNull(simpleCount);
+        //Because the simple topology was scheduled first we want to be sure that it didn't put anything on
+        // the GPU nodes.
+        assertEquals(1, simpleCount.size()); //Only 1 rack is in use
+        assertFalse(simpleCount.containsKey("r001")); //r001 is the second rack with GPUs
+        assertTrue(simpleCount.containsKey("r000")); //r000 is the first rack with no GPUs
+
+        //We don't really care too much about the scheduling of topology-gpu-0, because it was scheduled.
     }
 }


[5/5] storm git commit: Merge branch 'STORM-3040' of https://github.com/revans2/incubator-storm into STORM-3040

Posted by bo...@apache.org.
Merge branch 'STORM-3040' of https://github.com/revans2/incubator-storm into STORM-3040

STORM-3040: Improve scheduler performance

This closes #2647


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/53f38bc3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/53f38bc3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/53f38bc3

Branch: refs/heads/master
Commit: 53f38bc31f2fd315a520ba6b86c0a60be08381cc
Parents: 5cb4582 558e9b6
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon May 14 12:15:13 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon May 14 12:15:13 2018 -0500

----------------------------------------------------------------------
 pom.xml                                         |  11 +-
 .../apache/storm/testing/PerformanceTest.java   |  37 ++
 .../org/apache/storm/scheduler/Cluster.java     |  65 +--
 .../storm/scheduler/ISchedulingState.java       |   1 +
 .../scheduler/SchedulerAssignmentImpl.java      |  46 +-
 .../storm/scheduler/resource/RAS_Node.java      |  97 +++--
 .../normalization/NormalizedResourceOffer.java  |  46 +-
 .../NormalizedResourceRequest.java              |  36 +-
 .../normalization/NormalizedResources.java      |  38 +-
 .../NormalizedResourcesWithMemory.java          |   5 +
 .../normalization/ResourceMapArrayBridge.java   |   8 +
 .../scheduling/BaseResourceAwareStrategy.java   | 426 ++++++++++++-------
 .../scheduling/ConstraintSolverStrategy.java    |  23 +-
 .../DefaultResourceAwareStrategy.java           |  14 +-
 .../GenericResourceAwareStrategy.java           |  34 +-
 .../resource/TestResourceAwareScheduler.java    | 137 ++++--
 .../TestUtilsForResourceAwareScheduler.java     |  92 +++-
 .../TestDefaultResourceAwareStrategy.java       |   4 +-
 .../TestGenericResourceAwareStrategy.java       |  69 ++-
 19 files changed, 826 insertions(+), 363 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/53f38bc3/pom.xml
----------------------------------------------------------------------


[3/5] storm git commit: Fixed Checkstyle Issues

Posted by bo...@apache.org.
Fixed Checkstyle Issues


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d0be8aed
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d0be8aed
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d0be8aed

Branch: refs/heads/master
Commit: d0be8aed6ccba2abf7b32084fe8255f15514563c
Parents: 32392bc
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon May 7 09:56:27 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon May 7 09:56:27 2018 -0500

----------------------------------------------------------------------
 .../apache/storm/testing/PerformanceTest.java   |  4 +--
 .../storm/scheduler/resource/RAS_Node.java      | 37 +++++++++-----------
 .../normalization/NormalizedResourceOffer.java  |  9 +++--
 .../NormalizedResourceRequest.java              |  8 +++++
 .../normalization/NormalizedResources.java      | 11 ++++--
 .../normalization/ResourceMapArrayBridge.java   |  2 +-
 .../GenericResourceAwareStrategy.java           |  4 +++
 7 files changed, 46 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java b/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
index e657dd8..c5eba2f 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
@@ -25,13 +25,11 @@ package org.apache.storm.testing;
  * add the annotation @Category(PerformanceTest.class) to the class definition as well as to its hierarchy of superclasses.
  * For example:
  * <p/>
- *
- *
  * @ Category(PerformanceTest.class)<br/>
  * public class MyPerformanceTest {<br/>
  *  ...<br/>
  * }
- *
+ * <p/>
  *  In general performance tests should have a time limit on them, but the time limit should be liberal enough to account
  *  for running on CI systems like travis ci, or the apache jenkins build.
  */

http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 3a9570e..6075d5a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -53,6 +53,14 @@ public class RAS_Node {
     private boolean isAlive;
     private SupervisorDetails sup;
 
+    /**
+     * Create a new node.
+     * @param nodeId the id of the node.
+     * @param sup the supervisor this is for.
+     * @param cluster the cluster this is a part of.
+     * @param workerIdToWorker the mapping of slots already assigned to this node.
+     * @param assignmentMap the mapping of executors already assigned to this node.
+     */
     public RAS_Node(
         String nodeId,
         SupervisorDetails sup,
@@ -99,26 +107,6 @@ public class RAS_Node {
         }
     }
 
-    public static int countFreeSlotsAlive(Collection<RAS_Node> nodes) {
-        int total = 0;
-        for (RAS_Node n : nodes) {
-            if (n.isAlive()) {
-                total += n.totalSlotsFree();
-            }
-        }
-        return total;
-    }
-
-    public static int countTotalSlotsAlive(Collection<RAS_Node> nodes) {
-        int total = 0;
-        for (RAS_Node n : nodes) {
-            if (n.isAlive()) {
-                total += n.totalSlots();
-            }
-        }
-        return total;
-    }
-
     public String getId() {
         return nodeId;
     }
@@ -135,6 +123,10 @@ public class RAS_Node {
         return ret;
     }
 
+    /**
+     * Get the IDs of all free slots on this node.
+     * @return the ids of the free slots.
+     */
     public Collection<String> getFreeSlotsId() {
         if (!isAlive) {
             return new HashSet<>();
@@ -164,6 +156,11 @@ public class RAS_Node {
         return workerIdsToWorkers(getUsedSlotsId());
     }
 
+    /**
+     * Get slots used by the given topology.
+     * @param topId the id of the topology to get.
+     * @return the slots currently assigned to that topology on this node.
+     */
     public Collection<WorkerSlot> getUsedSlots(String topId) {
         Collection<WorkerSlot> ret = null;
         if (topIdToUsedSlots.get(topId) != null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
index 790a42b..b0defbb 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -96,7 +96,7 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
     /**
      * Calculate the average percentage used.
      * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
-     *     double, double).
+     *     double, double)
      */
     public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
         return normalizedResources.calculateAveragePercentageUsedBy(
@@ -115,7 +115,7 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
     /**
      * Check if resources might be able to fit.
      * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
-     *     double).
+     *     double)
      */
     public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
         return normalizedResources.couldHoldIgnoringSharedMemory(
@@ -136,6 +136,11 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
         return "Normalized resources: " + toNormalizedMap();
     }
 
+    /**
+     * If a node or rack has a kind of resource not in a request, make that resource negative so when sorting that node or rack will
+     * be less likely to be selected.
+     * @param requestedResources the requested resources.
+     */
     public void updateForRareResourceAffinity(NormalizedResourceRequest requestedResources) {
         normalizedResources.updateForRareResourceAffinity(requestedResources.getNormalizedResources());
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
index 86033b8..2a30ffc 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -134,6 +134,10 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
         return topologyResources;
     }
 
+    /**
+     * Convert to a map that is used by configuration and the UI.
+     * @return a map with the key as the resource name and the value the resource amount.
+     */
     public Map<String, Double> toNormalizedMap() {
         Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
         ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
@@ -168,6 +172,10 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
         offHeap += other.offHeap;
     }
 
+    /**
+     * Add the resources from a worker to those in this.
+     * @param value the resources on the worker.
+     */
     public void add(WorkerResources value) {
         this.normalizedResources.add(value);
         //The resources are already normalized

http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index e57d8f2..9fb717f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -347,10 +347,15 @@ public class NormalizedResources {
         return min * 100.0;
     }
 
-    public void updateForRareResourceAffinity(NormalizedResources other) {
-        int length = Math.min(this.otherResources.length, other.otherResources.length);
+    /**
+     * If a node or rack has a kind of resource not in a request, make that resource negative so when sorting that node or rack will
+     * be less likely to be selected.
+     * @param request the requested resources.
+     */
+    public void updateForRareResourceAffinity(NormalizedResources request) {
+        int length = Math.min(this.otherResources.length, request.otherResources.length);
         for (int i = 0; i < length; i++) {
-            if (other.getResourceAt(i) == 0.0) {
+            if (request.getResourceAt(i) == 0.0) {
                 this.otherResources[i] = -1 * this.otherResources[i];
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
index 5a71db4..4a4797b 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
@@ -65,7 +65,7 @@ public class ResourceMapArrayBridge {
     }
 
     /**
-     * Create an array that has all values 0;
+     * Create an array that has all values 0.
      * @return the empty array.
      */
     public double[] empty() {

http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index ce98abd..0c85e1c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -44,6 +44,10 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl
         final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
         final ExistingScheduleFunc existingScheduleFunc) {
         AllResources affinityBasedAllResources = new AllResources(allResources);
+        NormalizedResourceRequest requestedResources = topologyDetails.getTotalResources(exec);
+        for (ObjectResources objectResources : affinityBasedAllResources.objectResources) {
+            objectResources.availableResources.updateForRareResourceAffinity(requestedResources);
+        }
 
         TreeSet<ObjectResources> sortedObjectResources =
             new TreeSet<>((o1, o2) -> {


[4/5] storm git commit: Addressed review comments

Posted by bo...@apache.org.
Addressed review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/558e9b67
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/558e9b67
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/558e9b67

Branch: refs/heads/master
Commit: 558e9b671a7904aec5798cbac6cbdd2aca65b488
Parents: d0be8ae
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue May 8 09:56:22 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue May 8 09:56:22 2018 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/storm/scheduler/Cluster.java         | 3 +++
 .../main/java/org/apache/storm/scheduler/ISchedulingState.java    | 1 +
 2 files changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/558e9b67/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 0fc78e2..ab95bd2 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -47,6 +47,9 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * The current state of the storm cluster.  Cluster is not currently thread safe.
+ */
 public class Cluster implements ISchedulingState {
     private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
     private static final Function<String, Set<WorkerSlot>> MAKE_SET = (x) -> new HashSet<>();

http://git-wip-us.apache.org/repos/asf/storm/blob/558e9b67/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index b0d94b5..d96109c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -29,6 +29,7 @@ import org.apache.storm.scheduler.resource.normalization.NormalizedResourceReque
 
 /**
  * An interface that provides access to the current scheduling state.
+ * The scheduling state is not guaranteed to be thread safe.
  */
 public interface ISchedulingState {
 


[2/5] storm git commit: STORM-3040: Improve scheduler performance

Posted by bo...@apache.org.
STORM-3040: Improve scheduler performance


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/32392bc8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/32392bc8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/32392bc8

Branch: refs/heads/master
Commit: 32392bc8e19a21a7d8c22248fedf42673472acbc
Parents: 02639f4
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Apr 24 15:19:32 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri May 4 13:59:04 2018 -0500

----------------------------------------------------------------------
 pom.xml                                         |  11 +-
 .../apache/storm/testing/PerformanceTest.java   |  39 ++
 .../org/apache/storm/scheduler/Cluster.java     |  62 +--
 .../scheduler/SchedulerAssignmentImpl.java      |  46 +-
 .../storm/scheduler/resource/RAS_Node.java      |  60 +--
 .../normalization/NormalizedResourceOffer.java  |  37 +-
 .../NormalizedResourceRequest.java              |  28 +-
 .../normalization/NormalizedResources.java      |  33 +-
 .../NormalizedResourcesWithMemory.java          |   5 +
 .../normalization/ResourceMapArrayBridge.java   |   8 +
 .../scheduling/BaseResourceAwareStrategy.java   | 426 ++++++++++++-------
 .../scheduling/ConstraintSolverStrategy.java    |  23 +-
 .../DefaultResourceAwareStrategy.java           |  14 +-
 .../GenericResourceAwareStrategy.java           |  30 +-
 .../resource/TestResourceAwareScheduler.java    | 137 ++++--
 .../TestUtilsForResourceAwareScheduler.java     |  92 +++-
 .../TestDefaultResourceAwareStrategy.java       |   4 +-
 .../TestGenericResourceAwareStrategy.java       |  69 ++-
 18 files changed, 783 insertions(+), 341 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c486ac6..870d909 100644
--- a/pom.xml
+++ b/pom.xml
@@ -328,7 +328,7 @@
 
 
         <!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile -->
-        <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
+        <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest, org.apache.storm.testing.PerformanceTest</java.unit.test.exclude>
         <java.unit.test.include>**/Test*.java, **/*Test.java, **/*TestCase.java</java.unit.test.include>    <!--maven surefire plugin default test list-->
         <java.integration.test.include>no.tests</java.integration.test.include>
         <!-- by default the clojure test set are all clojure tests that are not integration tests. This property is overridden in the profiles -->
@@ -626,10 +626,19 @@
             <id>all-tests</id>
             <properties>
                 <java.integration.test.include>**/*.java</java.integration.test.include>
+                <!-- add perf tests back in -->
+                <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
                 <clojure.test.set>*.*</clojure.test.set>
             </properties>
         </profile>
         <profile>
+            <id>performance-tests</id>
+            <properties>
+                <!-- add perf tests back in -->
+                <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
+            </properties>
+        </profile> 
+	<profile>
             <id>integration-tests-only</id>
             <properties>
                 <!--Java-->

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java b/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
new file mode 100644
index 0000000..e657dd8
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.testing;
+
+/**
+ * Marker interface used to mark performance tests. Performance tests will be run if the profile performance-tests or all-tests are enabled.
+ * <p/>
+ * Performance tests can be in the same package as unit tests. To mark a test as a performance test,
+ * add the annotation @Category(PerformanceTest.class) to the class definition as well as to its hierarchy of superclasses.
+ * For example:
+ * <p/>
+ *
+ *
+ * @ Category(PerformanceTest.class)<br/>
+ * public class MyPerformanceTest {<br/>
+ *  ...<br/>
+ * }
+ *
+ *  In general performance tests should have a time limit on them, but the time limit should be liberal enough to account
+ *  for running on CI systems like travis ci, or the apache jenkins build.
+ */
+public interface PerformanceTest {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index bc08579..0fc78e2 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
@@ -48,6 +49,9 @@ import org.slf4j.LoggerFactory;
 
 public class Cluster implements ISchedulingState {
     private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
+    private static final Function<String, Set<WorkerSlot>> MAKE_SET = (x) -> new HashSet<>();
+    private static final Function<String, Map<WorkerSlot, NormalizedResourceRequest>> MAKE_MAP = (x) -> new HashMap<>();
+
     /**
      * key: supervisor id, value: supervisor details.
      */
@@ -72,6 +76,7 @@ public class Cluster implements ISchedulingState {
     private final Topologies topologies;
     private final Map<String, Map<WorkerSlot, NormalizedResourceRequest>> nodeToScheduledResourcesCache;
     private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
+    private final Map<String, NormalizedResourceRequest> totalResourcesPerNodeCache = new HashMap<>();
     private SchedulerAssignmentImpl assignment;
     private Set<String> blackListedHosts = new HashSet<>();
     private INimbus inimbus;
@@ -147,7 +152,7 @@ public class Cluster implements ISchedulingState {
         this.conf = conf;
         this.topologies = topologies;
 
-        ArrayList<String> supervisorHostNames = new ArrayList<String>();
+        ArrayList<String> supervisorHostNames = new ArrayList<>();
         for (SupervisorDetails s : supervisors.values()) {
             supervisorHostNames.add(s.getHost());
         }
@@ -294,7 +299,7 @@ public class Cluster implements ISchedulingState {
 
     @Override
     public List<TopologyDetails> needsSchedulingTopologies() {
-        List<TopologyDetails> ret = new ArrayList<TopologyDetails>();
+        List<TopologyDetails> ret = new ArrayList<>();
         for (TopologyDetails topology : getTopologies()) {
             if (needsScheduling(topology)) {
                 ret.add(topology);
@@ -349,8 +354,10 @@ public class Cluster implements ISchedulingState {
 
     @Override
     public Set<Integer> getUsedPorts(SupervisorDetails supervisor) {
-        return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), (x) -> new HashSet<>()).stream().map(WorkerSlot::getPort)
-                                   .collect(Collectors.toSet());
+        return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), MAKE_SET)
+            .stream()
+            .map(WorkerSlot::getPort)
+            .collect(Collectors.toSet());
     }
 
     @Override
@@ -374,7 +381,7 @@ public class Cluster implements ISchedulingState {
 
     @Override
     public List<WorkerSlot> getAvailableSlots() {
-        List<WorkerSlot> slots = new ArrayList<WorkerSlot>();
+        List<WorkerSlot> slots = new ArrayList<>();
         for (SupervisorDetails supervisor : this.supervisors.values()) {
             slots.addAll(this.getAvailableSlots(supervisor));
         }
@@ -385,7 +392,7 @@ public class Cluster implements ISchedulingState {
     @Override
     public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) {
         Set<Integer> ports = this.getAvailablePorts(supervisor);
-        List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
+        List<WorkerSlot> slots = new ArrayList<>(ports.size());
 
         for (Integer port : ports) {
             slots.add(new WorkerSlot(supervisor.getId(), port));
@@ -397,7 +404,7 @@ public class Cluster implements ISchedulingState {
     @Override
     public List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor) {
         Set<Integer> ports = this.getAssignablePorts(supervisor);
-        List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
+        List<WorkerSlot> slots = new ArrayList<>(ports.size());
 
         for (Integer port : ports) {
             slots.add(new WorkerSlot(supervisor.getId(), port));
@@ -408,7 +415,7 @@ public class Cluster implements ISchedulingState {
 
     @Override
     public List<WorkerSlot> getAssignableSlots() {
-        List<WorkerSlot> slots = new ArrayList<WorkerSlot>();
+        List<WorkerSlot> slots = new ArrayList<>();
         for (SupervisorDetails supervisor : this.supervisors.values()) {
             slots.addAll(this.getAssignableSlots(supervisor));
         }
@@ -419,7 +426,7 @@ public class Cluster implements ISchedulingState {
     @Override
     public Collection<ExecutorDetails> getUnassignedExecutors(TopologyDetails topology) {
         if (topology == null) {
-            return new ArrayList<ExecutorDetails>(0);
+            return new ArrayList<>(0);
         }
 
         Collection<ExecutorDetails> ret = new HashSet<>(topology.getExecutors());
@@ -441,7 +448,7 @@ public class Cluster implements ISchedulingState {
             return 0;
         }
 
-        Set<WorkerSlot> slots = new HashSet<WorkerSlot>();
+        Set<WorkerSlot> slots = new HashSet<>();
         slots.addAll(assignment.getExecutorToSlot().values());
         return slots.size();
     }
@@ -608,6 +615,7 @@ public class Cluster implements ISchedulingState {
         double sharedOffHeapMemory = calculateSharedOffHeapMemory(nodeId, assignment);
         assignment.setTotalSharedOffHeapMemory(nodeId, sharedOffHeapMemory);
         updateCachesForWorkerSlot(slot, resources, sharedOffHeapMemory);
+        totalResourcesPerNodeCache.remove(slot.getNodeId());
     }
 
     /**
@@ -676,10 +684,12 @@ public class Cluster implements ISchedulingState {
                 String nodeId = slot.getNodeId();
                 assignment.setTotalSharedOffHeapMemory(
                     nodeId, calculateSharedOffHeapMemory(nodeId, assignment));
-                nodeToScheduledResourcesCache.computeIfAbsent(nodeId, (x) -> new HashMap<>()).put(slot, new NormalizedResourceRequest());
-                nodeToUsedSlotsCache.computeIfAbsent(nodeId, (x) -> new HashSet<>()).remove(slot);
+                nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).put(slot, new NormalizedResourceRequest());
+                nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).remove(slot);
             }
         }
+        //Invalidate the cache as something on the node changed
+        totalResourcesPerNodeCache.remove(slot.getNodeId());
     }
 
     /**
@@ -697,7 +707,7 @@ public class Cluster implements ISchedulingState {
 
     @Override
     public boolean isSlotOccupied(WorkerSlot slot) {
-        return nodeToUsedSlotsCache.computeIfAbsent(slot.getNodeId(), (x) -> new HashSet<>()).contains(slot);
+        return nodeToUsedSlotsCache.computeIfAbsent(slot.getNodeId(), MAKE_SET).contains(slot);
     }
 
     @Override
@@ -731,7 +741,7 @@ public class Cluster implements ISchedulingState {
     @Override
     public List<SupervisorDetails> getSupervisorsByHost(String host) {
         List<String> nodeIds = this.hostToId.get(host);
-        List<SupervisorDetails> ret = new ArrayList<SupervisorDetails>();
+        List<SupervisorDetails> ret = new ArrayList<>();
 
         if (nodeIds != null) {
             for (String nodeId : nodeIds) {
@@ -744,7 +754,7 @@ public class Cluster implements ISchedulingState {
 
     @Override
     public Map<String, SchedulerAssignment> getAssignments() {
-        return new HashMap<String, SchedulerAssignment>(assignments);
+        return new HashMap<>(assignments);
     }
 
     /**
@@ -763,6 +773,7 @@ public class Cluster implements ISchedulingState {
             assertValidTopologyForModification(assignment.getTopologyId());
         }
         assignments.clear();
+        totalResourcesPerNodeCache.clear();
         nodeToScheduledResourcesCache.values().forEach(Map::clear);
         nodeToUsedSlotsCache.values().forEach(Set::clear);
         for (SchedulerAssignment assignment : newAssignments.values()) {
@@ -916,28 +927,27 @@ public class Cluster implements ISchedulingState {
         return ret;
     }
 
-
     /**
-     * This medhod updates ScheduledResources and UsedSlots cache for given workerSlot
-     *
-     * @param workerSlot
-     * @param workerResources
-     * @param sharedoffHeapMemory
+     * This medhod updates ScheduledResources and UsedSlots cache for given workerSlot.
      */
     private void updateCachesForWorkerSlot(WorkerSlot workerSlot, WorkerResources workerResources, Double sharedoffHeapMemory) {
         String nodeId = workerSlot.getNodeId();
         NormalizedResourceRequest normalizedResourceRequest = new NormalizedResourceRequest();
         normalizedResourceRequest.add(workerResources);
         normalizedResourceRequest.addOffHeap(sharedoffHeapMemory);
-        nodeToScheduledResourcesCache.computeIfAbsent(nodeId, (x) -> new HashMap<>()).put(workerSlot, normalizedResourceRequest);
-        nodeToUsedSlotsCache.computeIfAbsent(nodeId, (x) -> new HashSet<>()).add(workerSlot);
+        nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).put(workerSlot, normalizedResourceRequest);
+        nodeToUsedSlotsCache.computeIfAbsent(nodeId, MAKE_SET).add(workerSlot);
     }
 
     @Override
     public NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId) {
-        NormalizedResourceRequest totalScheduledResources = new NormalizedResourceRequest();
-        nodeToScheduledResourcesCache.computeIfAbsent(nodeId, (x) -> new HashMap<>()).values().forEach(totalScheduledResources::add);
-        return totalScheduledResources;
+        return totalResourcesPerNodeCache.computeIfAbsent(nodeId, (nid) -> {
+            NormalizedResourceRequest totalScheduledResources = new NormalizedResourceRequest();
+            for (NormalizedResourceRequest req : nodeToScheduledResourcesCache.computeIfAbsent(nodeId, MAKE_MAP).values()) {
+                totalScheduledResources.add(req);
+            }
+            return totalScheduledResources;
+        });
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
index dcb2ebc..077dafe 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
@@ -26,12 +26,14 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import org.apache.storm.generated.WorkerResources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SchedulerAssignmentImpl implements SchedulerAssignment {
     private static final Logger LOG = LoggerFactory.getLogger(SchedulerAssignmentImpl.class);
+    private static Function<WorkerSlot, Collection<ExecutorDetails>> MAKE_LIST = (k) -> new LinkedList<>();
 
     /**
      * topology-id this assignment is for.
@@ -44,8 +46,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
     private final Map<ExecutorDetails, WorkerSlot> executorToSlot = new HashMap<>();
     private final Map<WorkerSlot, WorkerResources> resources = new HashMap<>();
     private final Map<String, Double> nodeIdToTotalSharedOffHeap = new HashMap<>();
-    //Used to cache the slotToExecutors mapping.
-    private Map<WorkerSlot, Collection<ExecutorDetails>> slotToExecutorsCache = null;
+    private final Map<WorkerSlot, Collection<ExecutorDetails>> slotToExecutors = new HashMap<>();
 
     /**
      * Create a new assignment.
@@ -63,6 +64,9 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
                 throw new RuntimeException("Cannot create a scheduling with a null in it " + executorToSlot);
             }
             this.executorToSlot.putAll(executorToSlot);
+            for (Map.Entry<ExecutorDetails, WorkerSlot> entry : executorToSlot.entrySet()) {
+                slotToExecutors.computeIfAbsent(entry.getValue(), MAKE_LIST).add(entry.getKey());
+            }
         }
         if (resources != null) {
             if (resources.entrySet().stream().anyMatch((entry) -> entry.getKey() == null || entry.getValue() == null)) {
@@ -149,32 +153,26 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
         for (ExecutorDetails executor : executors) {
             this.executorToSlot.put(executor, slot);
         }
+        slotToExecutors.computeIfAbsent(slot, MAKE_LIST)
+            .addAll(executors);
         if (slotResources != null) {
             resources.put(slot, slotResources);
         } else {
             resources.remove(slot);
         }
-        //Clear the cache scheduling changed
-        slotToExecutorsCache = null;
     }
 
     /**
      * Release the slot occupied by this assignment.
      */
     public void unassignBySlot(WorkerSlot slot) {
-        //Clear the cache scheduling is going to change
-        slotToExecutorsCache = null;
-        List<ExecutorDetails> executors = new ArrayList<>();
-        for (ExecutorDetails executor : executorToSlot.keySet()) {
-            WorkerSlot ws = executorToSlot.get(executor);
-            if (ws.equals(slot)) {
-                executors.add(executor);
-            }
-        }
+        Collection<ExecutorDetails> executors = slotToExecutors.remove(slot);
 
         // remove
-        for (ExecutorDetails executor : executors) {
-            executorToSlot.remove(executor);
+        if (executors != null) {
+            for (ExecutorDetails executor : executors) {
+                executorToSlot.remove(executor);
+            }
         }
 
         resources.remove(slot);
@@ -194,7 +192,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
 
     @Override
     public boolean isSlotOccupied(WorkerSlot slot) {
-        return this.executorToSlot.containsValue(slot);
+        return this.slotToExecutors.containsKey(slot);
     }
 
     @Override
@@ -219,21 +217,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
 
     @Override
     public Map<WorkerSlot, Collection<ExecutorDetails>> getSlotToExecutors() {
-        Map<WorkerSlot, Collection<ExecutorDetails>> ret = slotToExecutorsCache;
-        if (ret != null) {
-            return ret;
-        }
-        ret = new HashMap<>();
-        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : executorToSlot.entrySet()) {
-            ExecutorDetails exec = entry.getKey();
-            WorkerSlot ws = entry.getValue();
-            if (!ret.containsKey(ws)) {
-                ret.put(ws, new LinkedList<ExecutorDetails>());
-            }
-            ret.get(ws).add(exec);
-        }
-        slotToExecutorsCache = ret;
-        return ret;
+        return slotToExecutors;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 4f43c0a..3a9570e 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -31,6 +31,7 @@ import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -127,7 +128,7 @@ public class RAS_Node {
     }
 
     private Collection<WorkerSlot> workerIdsToWorkers(Collection<String> workerIds) {
-        Collection<WorkerSlot> ret = new LinkedList<WorkerSlot>();
+        Collection<WorkerSlot> ret = new LinkedList<>();
         for (String workerId : workerIds) {
             ret.add(slots.get(workerId));
         }
@@ -136,26 +137,15 @@ public class RAS_Node {
 
     public Collection<String> getFreeSlotsId() {
         if (!isAlive) {
-            return new HashSet<String>();
+            return new HashSet<>();
         }
-        Collection<String> usedSlotsId = getUsedSlotsId();
-        Set<String> ret = new HashSet<>();
-        ret.addAll(slots.keySet());
-        ret.removeAll(usedSlotsId);
+        Set<String> ret = new HashSet<>(slots.keySet());
+        ret.removeAll(getUsedSlotsId());
         return ret;
     }
 
-    public Collection<WorkerSlot> getSlotsAvailbleTo(TopologyDetails td) {
-        //Try to reuse a slot if possible....
-        HashSet<WorkerSlot> ret = new HashSet<>();
-        Map<String, Collection<ExecutorDetails>> assigned = topIdToUsedSlots.get(td.getId());
-        if (assigned != null) {
-            ret.addAll(workerIdsToWorkers(assigned.keySet()));
-        }
-        ret.addAll(getFreeSlots());
-        ret.retainAll(
-            originallyFreeSlots); //RAS does not let you move things or modify existing assignments
-        return ret;
+    public Collection<WorkerSlot> getSlotsAvailableToScheduleOn() {
+        return originallyFreeSlots;
     }
 
     public Collection<WorkerSlot> getFreeSlots() {
@@ -163,7 +153,7 @@ public class RAS_Node {
     }
 
     private Collection<String> getUsedSlotsId() {
-        Collection<String> ret = new LinkedList<String>();
+        Collection<String> ret = new LinkedList<>();
         for (Map<String, Collection<ExecutorDetails>> entry : topIdToUsedSlots.values()) {
             ret.addAll(entry.keySet());
         }
@@ -329,16 +319,17 @@ public class RAS_Node {
         cluster.assign(target, td.getId(), executors);
 
         //assigning internally
-        if (!topIdToUsedSlots.containsKey(td.getId())) {
-            topIdToUsedSlots.put(td.getId(), new HashMap<String, Collection<ExecutorDetails>>());
-        }
-
-        if (!topIdToUsedSlots.get(td.getId()).containsKey(target.getId())) {
-            topIdToUsedSlots.get(td.getId()).put(target.getId(), new LinkedList<ExecutorDetails>());
-        }
-        topIdToUsedSlots.get(td.getId()).get(target.getId()).addAll(executors);
+        topIdToUsedSlots.computeIfAbsent(td.getId(), (tid) -> new HashMap<>())
+            .computeIfAbsent(target.getId(), (tid) -> new LinkedList<>())
+            .addAll(executors);
     }
 
+    /**
+     * Assign a single executor to a slot, even if other things are in the slot.
+     * @param ws the slot to assign it to.
+     * @param exec the executor to assign.
+     * @param td the topology for the executor.
+     */
     public void assignSingleExecutor(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
         if (!isAlive) {
             throw new IllegalStateException("Trying to adding to a dead node " + nodeId);
@@ -372,9 +363,7 @@ public class RAS_Node {
      * @return true if it would fit else false
      */
     public boolean wouldFit(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
-        if (!nodeId.equals(ws.getNodeId())) {
-            throw new IllegalStateException("Slot " + ws + " is not a part of this node " + nodeId);
-        }
+        assert nodeId.equals(ws.getNodeId()) : "Slot " + ws + " is not a part of this node " + nodeId;
         return isAlive
                && cluster.wouldFit(
             ws,
@@ -385,6 +374,19 @@ public class RAS_Node {
         );
     }
 
+    /**
+     * Is there any possibility that exec could ever fit on this node.
+     * @param exec the executor to schedule
+     * @param td the topology the executor is a part of
+     * @return true if there is the possibility it might fit, no guarantee that it will, or false if there is no
+     *     way it would ever fit.
+     */
+    public boolean couldEverFit(ExecutorDetails exec, TopologyDetails td) {
+        NormalizedResourceOffer avail = getTotalAvailableResources();
+        NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
+        return isAlive && avail.couldHoldIgnoringSharedMemory(requestedResources);
+    }
+
     @Override
     public boolean equals(Object other) {
         if (other instanceof RAS_Node) {

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
index 0c073a8..790a42b 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -43,10 +43,18 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
         this.normalizedResources = new NormalizedResources(normalizedResourceMap);
     }
 
+    /**
+     * Create an offer with all resources set to 0.
+     */
     public NormalizedResourceOffer() {
-        this((Map<String, ? extends Number>) null);
+        normalizedResources = new NormalizedResources();
+        totalMemoryMb = 0.0;
     }
 
+    /**
+     * Copy Constructor.
+     * @param other what to copy.
+     */
     public NormalizedResourceOffer(NormalizedResourceOffer other) {
         this.totalMemoryMb = other.totalMemoryMb;
         this.normalizedResources = new NormalizedResources(other.normalizedResources);
@@ -57,6 +65,10 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
         return totalMemoryMb;
     }
 
+    /**
+     * Return these resources as a normalized map.
+     * @return the normalized map.
+     */
     public Map<String, Double> toNormalizedMap() {
         Map<String, Double> ret = normalizedResources.toNormalizedMap();
         ret.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb);
@@ -68,6 +80,10 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
         totalMemoryMb += other.getTotalMemoryMb();
     }
 
+    /**
+     * Remove the resources in other from this.
+     * @param other what to remove.
+     */
     public void remove(NormalizedResourcesWithMemory other) {
         normalizedResources.remove(other.getNormalizedResources());
         totalMemoryMb -= other.getTotalMemoryMb();
@@ -75,10 +91,10 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
             normalizedResources.throwBecauseResourceBecameNegative(
                 Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, other.getTotalMemoryMb());
         }
-        ;
     }
 
     /**
+     * Calculate the average percentage used.
      * @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
      *     double, double).
      */
@@ -88,6 +104,7 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
     }
 
     /**
+     * Calculate the min percentage used of the resource.
      * @see NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
      *     double)
      */
@@ -96,6 +113,7 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
     }
 
     /**
+     * Check if resources might be able to fit.
      * @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
      *     double).
      */
@@ -112,4 +130,19 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
     public NormalizedResources getNormalizedResources() {
         return normalizedResources;
     }
+
+    @Override
+    public String toString() {
+        return "Normalized resources: " + toNormalizedMap();
+    }
+
+    public void updateForRareResourceAffinity(NormalizedResourceRequest requestedResources) {
+        normalizedResources.updateForRareResourceAffinity(requestedResources.getNormalizedResources());
+    }
+
+    @Override
+    public void clear() {
+        this.totalMemoryMb = 0.0;
+        this.normalizedResources.clear();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
index dc380e6..86033b8 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -43,15 +43,24 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
 
     private NormalizedResourceRequest(Map<String, ? extends Number> resources,
                                       Map<String, Double> defaultResources) {
-        Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
-        normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
-        onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
-        offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
-        normalizedResources = new NormalizedResources(normalizedResourceMap);
+        if (resources == null && defaultResources == null) {
+            onHeap = 0.0;
+            offHeap = 0.0;
+            normalizedResources = new NormalizedResources();
+        } else {
+            Map<String, Double> normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER
+                .normalizedResourceMap(defaultResources);
+            normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
+            onHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0);
+            offHeap = normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0);
+            normalizedResources = new NormalizedResources(normalizedResourceMap);
+        }
     }
+
     public NormalizedResourceRequest(ComponentCommon component, Map<String, Object> topoConf) {
         this(parseResources(component.get_json_conf()), getDefaultResources(topoConf));
     }
+
     public NormalizedResourceRequest(Map<String, Object> topoConf) {
         this((Map<String, ? extends Number>) null, getDefaultResources(topoConf));
     }
@@ -174,7 +183,7 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
 
     @Override
     public String toString() {
-        return super.toString() + " onHeap: " + onHeap + " offHeap: " + offHeap;
+        return "Normalized resources: " + toNormalizedMap();
     }
 
     public double getTotalCpu() {
@@ -185,4 +194,11 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
     public NormalizedResources getNormalizedResources() {
         return this.normalizedResources;
     }
+
+    @Override
+    public void clear() {
+        normalizedResources.clear();
+        offHeap = 0.0;
+        onHeap = 0.0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index c41cd95..e57d8f2 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -45,6 +45,14 @@ public class NormalizedResources {
     private double[] otherResources;
 
     /**
+     * Create an empty NormalizedResources.
+     */
+    NormalizedResources() {
+        cpu = 0.0;
+        otherResources = RESOURCE_MAP_ARRAY_BRIDGE.empty();
+    }
+
+    /**
      * Copy constructor.
      */
     public NormalizedResources(NormalizedResources other) {
@@ -224,12 +232,6 @@ public class NormalizedResources {
      *                                  resources that are not present in the total.
      */
     public double calculateAveragePercentageUsedBy(NormalizedResources used, double totalMemoryMb, double usedMemoryMb) {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Calculating avg percentage used by. Used Mem: {} Total Mem: {}"
-                      + " Used Normalized Resources: {} Total Normalized Resources: {}", totalMemoryMb, usedMemoryMb,
-                      toNormalizedMap(), used.toNormalizedMap());
-        }
-
         int skippedResourceTypes = 0;
         double total = 0.0;
         if (usedMemoryMb > totalMemoryMb) {
@@ -344,4 +346,23 @@ public class NormalizedResources {
         }
         return min * 100.0;
     }
+
+    public void updateForRareResourceAffinity(NormalizedResources other) {
+        int length = Math.min(this.otherResources.length, other.otherResources.length);
+        for (int i = 0; i < length; i++) {
+            if (other.getResourceAt(i) == 0.0) {
+                this.otherResources[i] = -1 * this.otherResources[i];
+            }
+        }
+    }
+
+    /**
+     * Set all resources to 0.
+     */
+    public void clear() {
+        this.cpu = 0.0;
+        for (int i = 0; i < otherResources.length; i++) {
+            otherResources[i] = 0.0;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
index 5001645..aeb0737 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourcesWithMemory.java
@@ -25,4 +25,9 @@ public interface NormalizedResourcesWithMemory {
 
     double getTotalMemoryMb();
 
+    /**
+     * Set all resources to 0.
+     */
+    void clear();
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
index 2b07c65..5a71db4 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
@@ -65,6 +65,14 @@ public class ResourceMapArrayBridge {
     }
 
     /**
+     * Create an array that has all values 0;
+     * @return the empty array.
+     */
+    public double[] empty() {
+        return new double[counter.get()];
+    }
+
+    /**
      * Translates an array of resource values to a normalized resource map.
      *
      * @param resources The resource array to translate

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index a474ccc..7e50f4a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -24,35 +24,64 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.storm.generated.ComponentType;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.Component;
 import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.RAS_Node;
 import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingStatus;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public abstract class BaseResourceAwareStrategy implements IStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(BaseResourceAwareStrategy.class);
     protected Cluster cluster;
-    protected RAS_Nodes nodes;
+    // Rack id to list of host names in that rack
     private Map<String, List<String>> networkTopography;
+    private final Map<String, String> superIdToRack = new HashMap<>();
+    private final Map<String, String> superIdToHostname = new HashMap<>();
+    private final Map<String, List<RAS_Node>> hostnameToNodes = new HashMap<>();
+    private final Map<String, List<RAS_Node>> rackIdToNodes = new HashMap<>();
+    protected RAS_Nodes nodes;
 
     @VisibleForTesting
     void prepare(Cluster cluster) {
         this.cluster = cluster;
         nodes = new RAS_Nodes(cluster);
         networkTopography = cluster.getNetworkTopography();
+        Map<String, String> hostToRack = new HashMap<>();
+        for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
+            String rackId = entry.getKey();
+            for (String hostName: entry.getValue()) {
+                hostToRack.put(hostName, rackId);
+            }
+        }
+        for (RAS_Node node: nodes.getNodes()) {
+            String superId = node.getId();
+            String hostName = node.getHostname();
+            String rackId = hostToRack.getOrDefault(hostName, DNSToSwitchMapping.DEFAULT_RACK);
+            superIdToHostname.put(superId, hostName);
+            superIdToRack.put(superId, rackId);
+            hostnameToNodes.computeIfAbsent(hostName, (hn) -> new ArrayList<>()).add(node);
+            rackIdToNodes.computeIfAbsent(rackId, (hn) -> new ArrayList<>()).add(node);
+        }
         logClusterInfo();
     }
 
@@ -61,15 +90,22 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
         //NOOP
     }
 
+    protected SchedulingResult mkNotEnoughResources(TopologyDetails td) {
+        return  SchedulingResult.failure(
+            SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
+            td.getExecutors().size() + " executors not scheduled");
+    }
+
     /**
      * Schedule executor exec from topology td.
      *
      * @param exec           the executor to schedule
      * @param td             the topology executor exec is a part of
      * @param scheduledTasks executors that have been scheduled
+     * @return true if scheduled successfully, else false.
      */
-    protected void scheduleExecutor(
-        ExecutorDetails exec, TopologyDetails td, Collection<ExecutorDetails> scheduledTasks, List<ObjectResources> sortedNodes) {
+    protected boolean scheduleExecutor(
+            ExecutorDetails exec, TopologyDetails td, Collection<ExecutorDetails> scheduledTasks, Iterable<String> sortedNodes) {
         WorkerSlot targetSlot = findWorkerForExec(exec, td, sortedNodes);
         if (targetSlot != null) {
             RAS_Node targetNode = idToNode(targetSlot.getNodeId());
@@ -86,8 +122,12 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
                 targetNode.getTotalCpuResources(),
                 targetSlot,
                 nodeToRack(targetNode));
+            return true;
         } else {
-            LOG.error("Not Enough Resources to schedule Task {}", exec);
+            String comp = td.getExecutorToComponent().get(exec);
+            NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
+            LOG.error("Not Enough Resources to schedule Task {} - {} {}", exec, comp, requestedResources);
+            return false;
         }
     }
 
@@ -103,12 +143,14 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
      * @param td   the topology that the executor is a part of
      * @return a worker to assign exec on. Returns null if a worker cannot be successfully found in cluster
      */
-    protected WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, List<ObjectResources> sortedNodes) {
-        for (ObjectResources nodeResources : sortedNodes) {
-            RAS_Node node = nodes.getNodeById(nodeResources.id);
-            for (WorkerSlot ws : node.getSlotsAvailbleTo(td)) {
-                if (node.wouldFit(ws, exec, td)) {
-                    return ws;
+    protected WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Iterable<String> sortedNodes) {
+        for (String id : sortedNodes) {
+            RAS_Node node = nodes.getNodeById(id);
+            if (node.couldEverFit(exec, td)) {
+                for (WorkerSlot ws : node.getSlotsAvailableToScheduleOn()) {
+                    if (node.wouldFit(ws, exec, td)) {
+                        return ws;
+                    }
                 }
             }
         }
@@ -133,96 +175,219 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
      * @return a sorted list of nodes.
      */
     protected TreeSet<ObjectResources> sortNodes(
-        List<RAS_Node> availNodes, ExecutorDetails exec, TopologyDetails topologyDetails, String rackId) {
-        AllResources allResources = new AllResources("RACK");
-        List<ObjectResources> nodes = allResources.objectResources;
+            List<RAS_Node> availNodes, ExecutorDetails exec, TopologyDetails topologyDetails, String rackId,
+            Map<String, AtomicInteger> scheduledCount) {
+        AllResources allRackResources = new AllResources("RACK");
+        List<ObjectResources> nodes = allRackResources.objectResources;
 
         for (RAS_Node rasNode : availNodes) {
-            String nodeId = rasNode.getId();
-            ObjectResources node = new ObjectResources(nodeId);
+            String superId = rasNode.getId();
+            ObjectResources node = new ObjectResources(superId);
 
             node.availableResources = rasNode.getTotalAvailableResources();
             node.totalResources = rasNode.getTotalResources();
 
             nodes.add(node);
-            allResources.availableResourcesOverall.add(node.availableResources);
-            allResources.totalResourcesOverall.add(node.totalResources);
+            allRackResources.availableResourcesOverall.add(node.availableResources);
+            allRackResources.totalResourcesOverall.add(node.totalResources);
 
         }
 
         LOG.debug(
             "Rack {}: Overall Avail [ {} ] Total [ {} ]",
             rackId,
-            allResources.availableResourcesOverall,
-            allResources.totalResourcesOverall);
+            allRackResources.availableResourcesOverall,
+            allRackResources.totalResourcesOverall);
 
-        String topoId = topologyDetails.getId();
         return sortObjectResources(
-            allResources,
+            allRackResources,
             exec,
             topologyDetails,
-            new ExistingScheduleFunc() {
-                @Override
-                public int getNumExistingSchedule(String objectId) {
-
-                    //Get execs already assigned in rack
-                    Collection<ExecutorDetails> execs = new LinkedList<>();
-                    if (cluster.getAssignmentById(topoId) != null) {
-                        for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
-                            cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
-                            WorkerSlot workerSlot = entry.getValue();
-                            ExecutorDetails exec = entry.getKey();
-                            if (workerSlot.getNodeId().equals(objectId)) {
-                                execs.add(exec);
-                            }
-                        }
-                    }
-                    return execs.size();
+            (superId) -> {
+                AtomicInteger count = scheduledCount.get(superId);
+                if (count == null) {
+                    return 0;
                 }
+                return count.get();
             });
     }
 
-    protected List<ObjectResources> sortAllNodes(TopologyDetails td, ExecutorDetails exec,
-                                                 List<String> favoredNodes, List<String> unFavoredNodes) {
-        TreeSet<ObjectResources> sortedRacks = sortRacks(exec, td);
-        ArrayList<ObjectResources> totallySortedNodes = new ArrayList<>();
-        for (ObjectResources rack : sortedRacks) {
-            final String rackId = rack.id;
-            TreeSet<ObjectResources> sortedNodes = sortNodes(
-                getAvailableNodesFromRack(rackId), exec, td, rackId);
-            totallySortedNodes.addAll(sortedNodes);
+    protected List<String> makeHostToNodeIds(List<String> hosts) {
+        if (hosts == null) {
+            return Collections.emptyList();
+        }
+        List<String> ret = new ArrayList<>(hosts.size());
+        for (String host: hosts) {
+            List<RAS_Node> nodes = hostnameToNodes.get(host);
+            if (nodes != null) {
+                for (RAS_Node node : nodes) {
+                    ret.add(node.getId());
+                }
+            }
+        }
+        return ret;
+    }
+
+    private static class LazyNodeSortingIterator implements Iterator<String> {
+        private final LazyNodeSorting parent;
+        private final Iterator<ObjectResources> rackIterator;
+        private Iterator<ObjectResources> nodeIterator;
+        private String nextValueFromNode = null;
+        private final Iterator<String> pre;
+        private final Iterator<String> post;
+        private final Set<String> skip;
+
+        public LazyNodeSortingIterator(LazyNodeSorting parent,
+                                       TreeSet<ObjectResources> sortedRacks) {
+            this.parent = parent;
+            rackIterator = sortedRacks.iterator();
+            pre = parent.favoredNodeIds.iterator();
+            post = parent.unFavoredNodeIds.iterator();
+            skip = parent.skippedNodeIds;
+        }
+
+        private Iterator<ObjectResources> getNodeIterator() {
+            if (nodeIterator != null && nodeIterator.hasNext()) {
+                return nodeIterator;
+            }
+            //need to get the next node iterator
+            if (rackIterator.hasNext()) {
+                ObjectResources rack = rackIterator.next();
+                final String rackId = rack.id;
+                nodeIterator = parent.getSortedNodesFor(rackId).iterator();
+                return nodeIterator;
+            }
+
+            return null;
         }
-        //Now do some post processing to add make some nodes preferred over others.
-        if (favoredNodes != null || unFavoredNodes != null) {
-            HashMap<String, Integer> hostOrder = new HashMap<>();
-            if (favoredNodes != null) {
-                int size = favoredNodes.size();
-                for (int i = 0; i < size; i++) {
-                    //First in the list is the most desired so gets the Lowest possible value
-                    hostOrder.put(favoredNodes.get(i), -(size - i));
+
+        @Override
+        public boolean hasNext() {
+            if (pre.hasNext()) {
+                return true;
+            }
+            while (true) {
+                //For the node we don't know if we have another one unless we look at the contents
+                Iterator<ObjectResources> nodeIterator = getNodeIterator();
+                if (nodeIterator == null || !nodeIterator.hasNext()) {
+                    break;
                 }
+                nextValueFromNode = nodeIterator.next().id;
+                if (!skip.contains(nextValueFromNode)) {
+                    return true;
+                }
+            }
+            if (post.hasNext()) {
+                return true;
             }
-            if (unFavoredNodes != null) {
-                int size = unFavoredNodes.size();
-                for (int i = 0; i < size; i++) {
-                    //First in the list is the least desired so gets the highest value
-                    hostOrder.put(unFavoredNodes.get(i), size - i);
+            return false;
+        }
+
+        @Override
+        public String next() {
+            if (pre.hasNext()) {
+                return pre.next();
+            }
+            if (nextValueFromNode != null) {
+                String tmp = nextValueFromNode;
+                nextValueFromNode = null;
+                return tmp;
+            }
+            return post.next();
+        }
+    }
+
+    private class LazyNodeSorting implements Iterable<String> {
+        private final Map<String, AtomicInteger> perNodeScheduledCount = new HashMap<>();
+        private final TreeSet<ObjectResources> sortedRacks;
+        private final Map<String, TreeSet<ObjectResources>> cachedNodes = new HashMap<>();
+        private final ExecutorDetails exec;
+        private final TopologyDetails td;
+        private final List<String> favoredNodeIds;
+        private final List<String> unFavoredNodeIds;
+        private final Set<String> skippedNodeIds = new HashSet<>();
+
+        public LazyNodeSorting(TopologyDetails td, ExecutorDetails exec,
+                               List<String> favoredNodeIds, List<String> unFavoredNodeIds) {
+            this.favoredNodeIds = favoredNodeIds;
+            this.unFavoredNodeIds = unFavoredNodeIds;
+            this.unFavoredNodeIds.removeAll(favoredNodeIds);
+            skippedNodeIds.addAll(favoredNodeIds);
+            skippedNodeIds.addAll(unFavoredNodeIds);
+
+            this.td = td;
+            this.exec = exec;
+            String topoId = td.getId();
+            SchedulerAssignment assignment = cluster.getAssignmentById(topoId);
+            if (assignment != null) {
+                for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+                    assignment.getSlotToExecutors().entrySet()) {
+                    String superId = entry.getKey().getNodeId();
+                    perNodeScheduledCount.computeIfAbsent(superId, (sid) -> new AtomicInteger(0))
+                        .getAndAdd(entry.getValue().size());
                 }
             }
-            //java guarantees a stable sort so we can just return 0 for values we don't want to move.
-            Collections.sort(totallySortedNodes, (o1, o2) -> {
-                RAS_Node n1 = this.nodes.getNodeById(o1.id);
-                String host1 = n1.getHostname();
-                int h1Value = hostOrder.getOrDefault(host1, 0);
+            sortedRacks = sortRacks(exec, td);
+        }
 
-                RAS_Node n2 = this.nodes.getNodeById(o2.id);
-                String host2 = n2.getHostname();
-                int h2Value = hostOrder.getOrDefault(host2, 0);
+        private TreeSet<ObjectResources> getSortedNodesFor(String rackId) {
+            return cachedNodes.computeIfAbsent(rackId,
+                (rid) -> sortNodes(rackIdToNodes.get(rid), exec, td, rid, perNodeScheduledCount));
+        }
 
-                return Integer.compare(h1Value, h2Value);
-            });
+        @Override
+        public Iterator<String> iterator() {
+            return new LazyNodeSortingIterator(this, sortedRacks);
         }
-        return totallySortedNodes;
+    }
+
+    protected Iterable<String> sortAllNodes(TopologyDetails td, ExecutorDetails exec,
+                                            List<String> favoredNodeIds, List<String> unFavoredNodeIds) {
+        return new LazyNodeSorting(td, exec, favoredNodeIds, unFavoredNodeIds);
+    }
+
+    private AllResources createClusterAllResources() {
+        AllResources allResources = new AllResources("Cluster");
+        List<ObjectResources> racks = allResources.objectResources;
+
+        //This is the first time so initialize the resources.
+        for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
+            String rackId = entry.getKey();
+            List<String> nodeHosts = entry.getValue();
+            ObjectResources rack = new ObjectResources(rackId);
+            racks.add(rack);
+            for (String nodeHost : nodeHosts) {
+                for (RAS_Node node : hostnameToNodes(nodeHost)) {
+                    rack.availableResources.add(node.getTotalAvailableResources());
+                    rack.totalResources.add(node.getTotalAvailableResources());
+                }
+            }
+
+            allResources.totalResourcesOverall.add(rack.totalResources);
+            allResources.availableResourcesOverall.add(rack.availableResources);
+        }
+
+        LOG.debug(
+            "Cluster Overall Avail [ {} ] Total [ {} ]",
+            allResources.availableResourcesOverall,
+            allResources.totalResourcesOverall);
+        return allResources;
+    }
+
+    private Map<String, AtomicInteger> getScheduledCount(TopologyDetails topologyDetails) {
+        String topoId = topologyDetails.getId();
+        SchedulerAssignment assignment = cluster.getAssignmentById(topoId);
+        Map<String, AtomicInteger> scheduledCount = new HashMap<>();
+        if (assignment != null) {
+            for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry :
+                assignment.getSlotToExecutors().entrySet()) {
+                String superId = entry.getKey().getNodeId();
+                String rackId = superIdToRack.get(superId);
+                scheduledCount.computeIfAbsent(rackId, (rid) -> new AtomicInteger(0))
+                    .getAndAdd(entry.getValue().size());
+            }
+        }
+        return scheduledCount;
     }
 
     /**
@@ -242,55 +407,20 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
      */
     @VisibleForTesting
     TreeSet<ObjectResources> sortRacks(ExecutorDetails exec, TopologyDetails topologyDetails) {
-        AllResources allResources = new AllResources("Cluster");
-        List<ObjectResources> racks = allResources.objectResources;
-
-        final Map<String, String> nodeIdToRackId = new HashMap<String, String>();
-
-        for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
-            String rackId = entry.getKey();
-            List<String> nodeIds = entry.getValue();
-            ObjectResources rack = new ObjectResources(rackId);
-            racks.add(rack);
-            for (String nodeId : nodeIds) {
-                RAS_Node node = nodes.getNodeById(nodeHostnameToId(nodeId));
-                rack.availableResources.add(node.getTotalAvailableResources());
-                rack.totalResources.add(node.getTotalAvailableResources());
-
-                nodeIdToRackId.put(nodeId, rack.id);
 
-                allResources.totalResourcesOverall.add(rack.totalResources);
-                allResources.availableResourcesOverall.add(rack.availableResources);
+        final AllResources allResources = createClusterAllResources();
+        final Map<String, AtomicInteger> scheduledCount = getScheduledCount(topologyDetails);
 
-            }
-        }
-        LOG.debug(
-            "Cluster Overall Avail [ {} ] Total [ {} ]",
-            allResources.availableResourcesOverall,
-            allResources.totalResourcesOverall);
-
-        String topoId = topologyDetails.getId();
         return sortObjectResources(
             allResources,
             exec,
             topologyDetails,
-            (objectId) -> {
-                String rackId = objectId;
-                //Get execs already assigned in rack
-                Collection<ExecutorDetails> execs = new LinkedList<>();
-                if (cluster.getAssignmentById(topoId) != null) {
-                    for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
-                        cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
-                        String nodeId = entry.getValue().getNodeId();
-                        String hostname = idToNode(nodeId).getHostname();
-                        ExecutorDetails exec1 = entry.getKey();
-                        if (nodeIdToRackId.get(hostname) != null
-                            && nodeIdToRackId.get(hostname).equals(rackId)) {
-                            execs.add(exec1);
-                        }
-                    }
+            (rackId) -> {
+                AtomicInteger count = scheduledCount.get(rackId);
+                if (count == null) {
+                    return 0;
                 }
-                return execs.size();
+                return count.get();
             });
     }
 
@@ -301,27 +431,7 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
      * @return the rack id
      */
     protected String nodeToRack(RAS_Node node) {
-        for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
-            if (entry.getValue().contains(node.getHostname())) {
-                return entry.getKey();
-            }
-        }
-        LOG.error("Node: {} not found in any racks", node.getHostname());
-        return null;
-    }
-
-    /**
-     * get a list nodes from a rack.
-     *
-     * @param rackId the rack id of the rack to get nodes from
-     * @return a list of nodes
-     */
-    protected List<RAS_Node> getAvailableNodesFromRack(String rackId) {
-        List<RAS_Node> retList = new ArrayList<>();
-        for (String nodeId : networkTopography.get(rackId)) {
-            retList.add(nodes.getNodeById(this.nodeHostnameToId(nodeId)));
-        }
-        return retList;
+        return superIdToRack.get(node.getId());
     }
 
     /**
@@ -459,9 +569,7 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
     }
 
     /**
-     * Get the amount of resources available and total for each node.
-     *
-     * @return a String with cluster resource info for debug
+     * Log a bunch of stuff for debugging.
      */
     private void logClusterInfo() {
         if (LOG.isDebugEnabled()) {
@@ -470,40 +578,32 @@ public abstract class BaseResourceAwareStrategy implements IStrategy {
                 String rackId = clusterEntry.getKey();
                 LOG.debug("Rack: {}", rackId);
                 for (String nodeHostname : clusterEntry.getValue()) {
-                    RAS_Node node = idToNode(this.nodeHostnameToId(nodeHostname));
-                    LOG.debug("-> Node: {} {}", node.getHostname(), node.getId());
-                    LOG.debug(
-                        "--> Avail Resources: {Mem {}, CPU {} Slots: {}}",
-                        node.getAvailableMemoryResources(),
-                        node.getAvailableCpuResources(),
-                        node.totalSlotsFree());
-                    LOG.debug(
-                        "--> Total Resources: {Mem {}, CPU {} Slots: {}}",
-                        node.getTotalMemoryResources(),
-                        node.getTotalCpuResources(),
-                        node.totalSlots());
+                    for (RAS_Node node : hostnameToNodes(nodeHostname)) {
+                        LOG.debug("-> Node: {} {}", node.getHostname(), node.getId());
+                        LOG.debug(
+                            "--> Avail Resources: {Mem {}, CPU {} Slots: {}}",
+                            node.getAvailableMemoryResources(),
+                            node.getAvailableCpuResources(),
+                            node.totalSlotsFree());
+                        LOG.debug(
+                            "--> Total Resources: {Mem {}, CPU {} Slots: {}}",
+                            node.getTotalMemoryResources(),
+                            node.getTotalCpuResources(),
+                            node.totalSlots());
+                    }
                 }
             }
         }
     }
 
     /**
-     * hostname to Id.
+     * hostname to Ids.
      *
-     * @param hostname the hostname to convert to node id
-     * @return the id of a node
+     * @param hostname the hostname.
+     * @return the ids n that node.
      */
-    public String nodeHostnameToId(String hostname) {
-        for (RAS_Node n : nodes.getNodes()) {
-            if (n.getHostname() == null) {
-                continue;
-            }
-            if (n.getHostname().equals(hostname)) {
-                return n.getId();
-            }
-        }
-        LOG.error("Cannot find Node with hostname {}", hostname);
-        return null;
+    public List<RAS_Node> hostnameToNodes(String hostname) {
+        return hostnameToNodes.get(hostname);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
index 12f89ca..9ccfd50 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
@@ -45,16 +45,17 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
     public static final int MAX_STATE_SEARCH = 100_000;
     public static final int DEFAULT_STATE_SEARCH = 10_000;
     private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
-    private Map<String, RAS_Node> nodes;
-    private Map<ExecutorDetails, String> execToComp;
-    private Map<String, Set<ExecutorDetails>> compToExecs;
-    private List<String> favoredNodes;
-    private List<String> unFavoredNodes;
 
     //constraints and spreads
     private Map<String, Map<String, Integer>> constraintMatrix;
     private HashSet<String> spreadComps = new HashSet<>();
 
+    private Map<String, RAS_Node> nodes;
+    private Map<ExecutorDetails, String> execToComp;
+    private Map<String, Set<ExecutorDetails>> compToExecs;
+    private List<String> favoredNodeIds;
+    private List<String> unFavoredNodeIds;
+
     static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) {
         Map<String, Map<String, Integer>> matrix = new HashMap<>();
         for (String comp : comps) {
@@ -252,8 +253,8 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
         final long maxTimeMs =
             ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1).intValue() * 1000L;
 
-        favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
-        unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
+        favoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+        unFavoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
 
         //get mapping of execs to components
         execToComp = td.getExecutorToComponent();
@@ -328,11 +329,11 @@ public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
         }
 
         ExecutorDetails exec = state.currentExec();
-        List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec, favoredNodes, unFavoredNodes);
+        Iterable<String> sortedNodes = sortAllNodes(state.td, exec, favoredNodeIds, unFavoredNodeIds);
 
-        for (ObjectResources nodeResources : sortedNodes) {
-            RAS_Node node = nodes.get(nodeResources.id);
-            for (WorkerSlot workerSlot : node.getSlotsAvailbleTo(state.td)) {
+        for (String nodeId: sortedNodes) {
+            RAS_Node node = nodes.get(nodeId);
+            for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) {
                 if (isExecAssignmentToWorkerValid(workerSlot, state)) {
                     state.tryToSchedule(execToComp, node, workerSlot);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 826c24d..a832557 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -60,9 +60,9 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl
         //order executors to be scheduled
         List<ExecutorDetails> orderedExecutors = this.orderExecutors(td, unassignedExecutors);
         Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
-        List<String> favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
-        List<String> unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
-        final List<ObjectResources> sortedNodes = sortAllNodes(td, null, favoredNodes, unFavoredNodes);
+        List<String> favoredNodesIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+        List<String> unFavoredNodesIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
+        final Iterable<String> sortedNodes = sortAllNodes(td, null, favoredNodesIds, unFavoredNodesIds);
 
         for (ExecutorDetails exec : orderedExecutors) {
             LOG.debug(
@@ -70,14 +70,18 @@ public class DefaultResourceAwareStrategy extends BaseResourceAwareStrategy impl
                 exec,
                 td.getExecutorToComponent().get(exec),
                 td.getTaskResourceReqList(exec));
-            scheduleExecutor(exec, td, scheduledTasks, sortedNodes);
+            if (!scheduleExecutor(exec, td, scheduledTasks, sortedNodes)) {
+                return mkNotEnoughResources(td);
+            }
         }
 
         executorsNotScheduled.removeAll(scheduledTasks);
         LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
         // schedule left over system tasks
         for (ExecutorDetails exec : executorsNotScheduled) {
-            scheduleExecutor(exec, td, scheduledTasks, sortedNodes);
+            if (!scheduleExecutor(exec, td, scheduledTasks, sortedNodes)) {
+                return mkNotEnoughResources(td);
+            }
         }
 
         SchedulingResult result;

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index af9c681..ce98abd 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -30,6 +30,7 @@ import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import org.apache.storm.scheduler.resource.SchedulingStatus;
+import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,7 +82,7 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl
         }
         Collection<ExecutorDetails> unassignedExecutors =
             new HashSet<>(this.cluster.getUnassignedExecutors(td));
-        LOG.info("ExecutorsNeedScheduling: {}", unassignedExecutors);
+        LOG.debug("Num ExecutorsNeedScheduling: {}", unassignedExecutors.size());
         Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
         List<Component> spouts = this.getSpouts(td);
 
@@ -92,10 +93,10 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl
         }
 
         //order executors to be scheduled
-        List<ExecutorDetails> orderedExecutors = this.orderExecutors(td, unassignedExecutors);
+        List<ExecutorDetails> orderedExecutors = orderExecutors(td, unassignedExecutors);
         Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
-        List<String> favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
-        List<String> unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
+        List<String> favoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
+        List<String> unFavoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
 
         for (ExecutorDetails exec : orderedExecutors) {
             LOG.debug(
@@ -103,17 +104,24 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl
                 exec,
                 td.getExecutorToComponent().get(exec),
                 td.getTaskResourceReqList(exec));
-            final List<ObjectResources> sortedNodes = this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes);
+            final Iterable<String> sortedNodes = sortAllNodes(td, exec, favoredNodeIds, unFavoredNodeIds);
 
-            scheduleExecutor(exec, td, scheduledTasks, sortedNodes);
+            if (!scheduleExecutor(exec, td, scheduledTasks, sortedNodes)) {
+                return mkNotEnoughResources(td);
+            }
         }
 
         executorsNotScheduled.removeAll(scheduledTasks);
-        LOG.error("/* Scheduling left over task (most likely sys tasks) */");
-        // schedule left over system tasks
-        for (ExecutorDetails exec : executorsNotScheduled) {
-            final List<ObjectResources> sortedNodes = this.sortAllNodes(td, exec, favoredNodes, unFavoredNodes);
-            scheduleExecutor(exec, td, scheduledTasks, sortedNodes);
+        if (!executorsNotScheduled.isEmpty()) {
+            LOG.warn("Scheduling {} left over task (most likely sys tasks)", executorsNotScheduled);
+            // schedule left over system tasks
+            for (ExecutorDetails exec : executorsNotScheduled) {
+                final Iterable<String> sortedNodes = sortAllNodes(td, exec, favoredNodeIds, unFavoredNodeIds);
+                if (!scheduleExecutor(exec, td, scheduledTasks, sortedNodes)) {
+                    return mkNotEnoughResources(td);
+                }
+            }
+            executorsNotScheduled.removeAll(scheduledTasks);
         }
 
         SchedulingResult result;

http://git-wip-us.apache.org/repos/asf/storm/blob/32392bc8/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 45a2436..74d3ae3 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.WorkerResources;
@@ -38,38 +40,27 @@ import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
 import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
+import org.apache.storm.testing.PerformanceTest;
 import org.apache.storm.testing.TestWordCounter;
 import org.apache.storm.testing.TestWordSpout;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.DisallowedStrategyException;
 import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.validation.ConfigValidation;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.INimbusTest;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.TestBolt;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.TestSpout;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.addTopologies;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.assertTopologiesNotScheduled;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.createClusterConfig;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genExecsAndComps;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genSupervisors;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genTopology;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userRes;
-import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.userResourcePool;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
+import static org.junit.Assert.*;
 
 public class TestResourceAwareScheduler {
 
@@ -98,9 +89,9 @@ public class TestResourceAwareScheduler {
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, config);
         Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster);
         assertEquals(5, nodes.size());
-        RAS_Node node = nodes.get("sup-0");
+        RAS_Node node = nodes.get("r000s000");
 
-        assertEquals("sup-0", node.getId());
+        assertEquals("r000s000", node.getId());
         assertTrue(node.isAlive());
         assertEquals(0, node.getRunningTopologies().size());
         assertTrue(node.isTotallyFree());
@@ -436,16 +427,16 @@ public class TestResourceAwareScheduler {
 
         // Test2: When a supervisor fails, RAS does not alter existing assignments
         executorToSlot = new HashMap<>();
-        executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 0));
-        executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 1));
-        executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 1));
+        executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("r000s000", 0));
+        executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("r000s000", 1));
+        executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("r000s001", 1));
         Map<String, SchedulerAssignment> existingAssignments = new HashMap<>();
         assignment = new SchedulerAssignmentImpl(topology1.getId(), executorToSlot, null, null);
         existingAssignments.put(topology1.getId(), assignment);
         copyOfOldMapping = new HashMap<>(executorToSlot);
         Set<ExecutorDetails> existingExecutors = copyOfOldMapping.keySet();
         Map<String, SupervisorDetails> supMap1 = new HashMap<>(supMap);
-        supMap1.remove("sup-0"); // mock the supervisor sup-0 as a failed supervisor
+        supMap1.remove("r000s000"); // mock the supervisor r000s000 as a failed supervisor
 
         topologies = new Topologies(topology1);
         Cluster cluster1 = new Cluster(iNimbus, supMap1, existingAssignments, topologies, config1);
@@ -462,19 +453,19 @@ public class TestResourceAwareScheduler {
 
         // Test3: When a supervisor and a worker on it fails, RAS does not alter existing assignments
         executorToSlot = new HashMap<>();
-        executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 1)); // the worker to orphan
-        executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 2)); // the worker that fails
-        executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 1)); // the healthy worker
+        executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("r000s000", 1)); // the worker to orphan
+        executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("r000s000", 2)); // the worker that fails
+        executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("r000s001", 1)); // the healthy worker
         existingAssignments = new HashMap<>();
         assignment = new SchedulerAssignmentImpl(topology1.getId(), executorToSlot, null, null);
         existingAssignments.put(topology1.getId(), assignment);
-        // delete one worker of sup-0 (failed) from topo1 assignment to enable actual schedule for testing
+        // delete one worker of r000s000 (failed) from topo1 assignment to enable actual schedule for testing
         executorToSlot.remove(new ExecutorDetails(1, 1));
 
         copyOfOldMapping = new HashMap<>(executorToSlot);
         existingExecutors = copyOfOldMapping.keySet(); // namely the two eds on the orphaned worker and the healthy worker
         supMap1 = new HashMap<>(supMap);
-        supMap1.remove("sup-0"); // mock the supervisor sup-0 as a failed supervisor
+        supMap1.remove("r000s000"); // mock the supervisor r000s000 as a failed supervisor
 
         topologies = new Topologies(topology1);
         cluster1 = new Cluster(iNimbus, supMap1, existingAssignments, topologies, config1);
@@ -530,7 +521,7 @@ public class TestResourceAwareScheduler {
             for (int j = 0; j < 4; j++) {
                 ports.add(j);
             }
-            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, i == 0 ? resourceMap1 : resourceMap2);
+            SupervisorDetails sup = new SupervisorDetails("r00s00" + i, "host-" + i, null, ports, i == 0 ? resourceMap1 : resourceMap2);
             supMap.put(sup.getId(), sup);
         }
         LOG.info("SUPERVISORS = {}", supMap);
@@ -702,7 +693,7 @@ public class TestResourceAwareScheduler {
         rs.schedule(topologies, cluster);
         String status = cluster.getStatusMap().get(topology2.getId());
         assert status.startsWith("Not enough resources to schedule") : status;
-        assert status.endsWith("0/5 executors scheduled") : status;
+        assert status.endsWith("5 executors not scheduled") : status;
         assertEquals(5, cluster.getUnassignedExecutors(topology2).size());
     }
 
@@ -989,4 +980,90 @@ public class TestResourceAwareScheduler {
         Object sched = ReflectionUtils.newSchedulerStrategyInstance(allowed, config);
         assertEquals(sched.getClass().getName(), allowed);
     }
+
+    @Category(PerformanceTest.class)
+    @Test(timeout=30_000)
+    public void testLargeTopologiesOnLargeClusters() {
+        testLargeTopologiesCommon(DefaultResourceAwareStrategy.class.getName(), false, 1);
+    }
+    
+    @Category(PerformanceTest.class)
+    @Test(timeout=60_000)
+    public void testLargeTopologiesOnLargeClustersGras() {
+        testLargeTopologiesCommon(GenericResourceAwareStrategy.class.getName(), true, 1);
+    }
+
+    public void testLargeTopologiesCommon(final String strategy, final boolean includeGpu, final int multiplier) {
+        INimbus iNimbus = new INimbusTest();
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithRacks(25 * multiplier, 40, 66, 3 * multiplier, 0, 4700, 226200, new HashMap<>());
+        if (includeGpu) {
+            HashMap<String, Double> extraResources = new HashMap<>();
+            extraResources.put("my.gpu", 1.0);
+            supMap.putAll(genSupervisorsWithRacks(3 * multiplier, 40, 66, 0, 0, 4700, 226200, extraResources));
+        }
+
+        Config config = new Config();
+        config.putAll(createClusterConfig(88, 775, 25, null));
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, strategy);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        Map<String, TopologyDetails> topologyDetailsMap = new HashMap<>();
+        for (int i = 0; i < 11 * multiplier; i++) {
+            TopologyDetails td = genTopology(String.format("topology-%05d", i), config, 5,
+                40, 30, 114, 0, 0, "user", 8192);
+            topologyDetailsMap.put(td.getId(), td);
+        }
+        if (includeGpu) {
+            for (int i = 0; i < multiplier; i++) {
+                TopologyBuilder builder = topologyBuilder(5, 40, 30, 114);
+                builder.setBolt("gpu-bolt", new TestBolt(), 40)
+                    .addResource("my.gpu", 1.0)
+                    .shuffleGrouping("spout-0");
+                TopologyDetails td = topoToTopologyDetails(String.format("topology-gpu-%05d", i), config, builder.createTopology(), 0, 0,
+                    "user", 8192);
+                topologyDetailsMap.put(td.getId(), td);
+            }
+        }
+        Topologies topologies = new Topologies(topologyDetailsMap);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<>(), topologies, config);
+
+        long startTime = Time.currentTimeMillis();
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+        long schedulingDuration = Time.currentTimeMillis() - startTime;
+        LOG.info("Scheduling took " + schedulingDuration + " ms");
+        LOG.info("HAS {} SLOTS USED", cluster.getUsedSlots().size());
+
+        Map<String, SchedulerAssignment> assignments = new TreeMap<>(cluster.getAssignments());
+
+        for (Entry<String, SchedulerAssignment> entry: assignments.entrySet()) {
+            SchedulerAssignment sa = entry.getValue();
+            Map<String, AtomicLong> slotsPerRack = new TreeMap<>();
+            for (WorkerSlot slot : sa.getSlots()) {
+                String nodeId = slot.getNodeId();
+                String rack = supervisorIdToRackName(nodeId);
+                slotsPerRack.computeIfAbsent(rack, (r) -> new AtomicLong(0)).incrementAndGet();
+            }
+            LOG.info("{} => {}", entry.getKey(), slotsPerRack);
+        }
+    }
+
+    public static void main(String[] args) {
+        String strategy = DefaultResourceAwareStrategy.class.getName();
+        if (args.length > 0) {
+            strategy = args[0];
+        }
+        boolean includeGpu = false;
+        if (args.length > 1) {
+            includeGpu = Boolean.valueOf(args[1]);
+        }
+        int multiplier = 1;
+        if (args.length > 2) {
+            multiplier = Integer.valueOf(args[2]);
+        }
+        TestResourceAwareScheduler trs = new TestResourceAwareScheduler();
+        trs.testLargeTopologiesCommon(strategy, includeGpu, multiplier);
+        System.exit(0);
+    }
 }