You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/08/14 14:46:02 UTC

[1/3] storm git commit: [STORM-1766] - A better algorithm server rack selection for RAS

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 66f5f68aa -> 560746f8e


[STORM-1766] - A better algorithm server rack selection for RAS


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d50a2437
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d50a2437
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d50a2437

Branch: refs/heads/1.x-branch
Commit: d50a2437923f2919fbee130b8e9e86a62a2d9f48
Parents: 9e9ec43
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Wed May 4 17:08:57 2016 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Aug 11 10:41:41 2016 -0500

----------------------------------------------------------------------
 .../jvm/org/apache/storm/scheduler/Cluster.java |   7 +
 .../storm/scheduler/resource/RAS_Node.java      |  11 +-
 .../DefaultResourceAwareStrategy.java           | 228 +++++++++----
 .../resource/TestResourceAwareScheduler.java    | 114 +------
 .../TestUtilsForResourceAwareScheduler.java     |  25 +-
 .../TestDefaultResourceAwareStrategy.java       | 333 +++++++++++++++++++
 6 files changed, 542 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d50a2437/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java b/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
index a6622ce..89cc1bc 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
@@ -103,6 +103,9 @@ public class Cluster {
         this.status.putAll(src.status);
         this.topologyResources.putAll(src.topologyResources);
         this.blackListedHosts.addAll(src.blackListedHosts);
+        if (src.networkTopography != null) {
+            this.networkTopography = new HashMap<String, List<String>>(src.networkTopography);
+        }
     }
     
     public void setBlacklistedHosts(Set<String> hosts) {
@@ -522,6 +525,10 @@ public class Cluster {
         return networkTopography;
     }
 
+    public void setNetworkTopography(Map<String, List<String>> networkTopography) {
+        this.networkTopography = networkTopography;
+    }
+
     private String getStringFromStringList(Object o) {
         StringBuilder sb = new StringBuilder();
         for (String s : (List<String>) o) {

http://git-wip-us.apache.org/repos/asf/storm/blob/d50a2437/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
index 8d37805..79502d7 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -89,10 +89,6 @@ public class RAS_Node {
             _sup = sup;
             _availMemory = getTotalMemoryResources();
             _availCPU = getTotalCpuResources();
-
-            LOG.debug("Found a {} Node {} {}",
-                    _isAlive ? "living" : "dead", _nodeId, sup.getAllPorts());
-            LOG.debug("resources_mem: {}, resources_CPU: {}", sup.getTotalMemory(), sup.getTotalCPU());
             //intialize resource usages on node
             intializeResources();
         }
@@ -363,8 +359,11 @@ public class RAS_Node {
     @Override
     public String toString() {
         return "{Node: " + ((_sup == null) ? "null (possibly down)" : _sup.getHost())
-                + ", AvailMem: " + ((_availMemory == null) ? "N/A" : _availMemory.toString())
-                + ", AvailCPU: " + ((_availCPU == null) ? "N/A" : _availCPU.toString()) + "}";
+                + ", Avail [ Mem: " + ((_availMemory == null) ? "N/A" : _availMemory.toString())
+                + ", CPU: " + ((_availCPU == null) ? "N/A" : _availCPU.toString()) + ", Slots: " + this.getFreeSlots()
+                + "] Total [ Mem: " + ((_sup == null) ? "N/A" : this.getTotalMemoryResources())
+                + ", CPU: " + ((_sup == null) ? "N/A" : this.getTotalCpuResources()) + ", Slots: "
+                + this._slots.values() + " ]}";
     }
 
     public static int countSlotsUsed(String topId, Collection<RAS_Node> nodes) {

http://git-wip-us.apache.org/repos/asf/storm/blob/d50a2437/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index c4ce7ef..8957dc0 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -20,6 +20,7 @@ package org.apache.storm.scheduler.resource.strategies.scheduling;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -29,6 +30,7 @@ import java.util.Queue;
 import java.util.TreeMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.TreeSet;
 
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.Topologies;
@@ -103,16 +105,16 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
         Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
         Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
-        //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth. 
+        //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth.
         //Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
         for (int i = 0; i < longestPriorityListSize; i++) {
             for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) {
                 Iterator<ExecutorDetails> it = entry.getValue().iterator();
                 if (it.hasNext()) {
                     ExecutorDetails exec = it.next();
-                    LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
+                    LOG.debug("\n\nAttempting to schedule: {} of component {}[ REQ {} ] with rank {}",
                             new Object[] { exec, td.getExecutorToComponent().get(exec),
-                    td.getTaskResourceReqList(exec), entry.getKey() });
+                                    td.getTaskResourceReqList(exec), entry.getKey() });
                     scheduleExecutor(exec, td, schedulerAssignmentMap, scheduledTasks);
                     it.remove();
                 }
@@ -156,42 +158,49 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             schedulerAssignmentMap.get(targetSlot).add(exec);
             targetNode.consumeResourcesforTask(exec, td);
             scheduledTasks.add(exec);
-            LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
-                    targetNode, targetNode.getAvailableMemoryResources(),
+            LOG.debug("TASK {} assigned to Node: {} avail [ mem: {} cpu: {} ] total [ mem: {} cpu: {} ] on slot: {} on Rack: {}", exec,
+                    targetNode.getHostname(), targetNode.getAvailableMemoryResources(),
                     targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
-                    targetNode.getTotalCpuResources(), targetSlot);
+                    targetNode.getTotalCpuResources(), targetSlot, nodeToRack(targetNode));
         } else {
             LOG.error("Not Enough Resources to schedule Task {}", exec);
         }
     }
 
     private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
-      WorkerSlot ws;
-      // first scheduling
-      if (this.refNode == null) {
-          String clus = this.getBestClustering();
-          ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
-      } else {
-          ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
-      }
-      if(ws != null) {
-          this.refNode = this.idToNode(ws.getNodeId());
-      }
-      LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
-      return ws;
+        WorkerSlot ws = null;
+        // first scheduling
+        if (this.refNode == null) {
+            // iterate through an ordered list of all racks available to make sure we cannot schedule the first executor in any rack before we "give up"
+            // the list is ordered in decreasing order of effective resources. With the rack in the front of the list having the most effective resources.
+            for (RackResources rack : sortRacks(td.getId())) {
+                ws = this.getBestWorker(exec, td, rack.id, scheduleAssignmentMap);
+                if (ws != null) {
+                    LOG.debug("best rack: {}", rack.id);
+                    break;
+                }
+            }
+        } else {
+            ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
+        }
+        if (ws != null) {
+            this.refNode = this.idToNode(ws.getNodeId());
+        }
+        LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
+        return ws;
     }
 
     private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
         return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
     }
 
-    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String rackId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
         double taskMem = td.getTotalMemReqTask(exec);
         double taskCPU = td.getTotalCpuReqTask(exec);
         List<RAS_Node> nodes;
-        if(clusterId != null) {
-            nodes = this.getAvailableNodesFromCluster(clusterId);
-            
+        if(rackId != null) {
+            nodes = this.getAvailableNodesFromRack(rackId);
+
         } else {
             nodes = this.getAvailableNodes();
         }
@@ -227,71 +236,178 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         return null;
     }
 
-    private String getBestClustering() {
-        String bestCluster = null;
-        Double mostRes = 0.0;
-        for (Entry<String, List<String>> cluster : _clusterInfo
-                .entrySet()) {
-            Double clusterTotalRes = this.getTotalClusterRes(cluster.getValue());
-            if (clusterTotalRes > mostRes) {
-                mostRes = clusterTotalRes;
-                bestCluster = cluster.getKey();
+    /**
+     * class to keep track of resources on a rack
+     */
+    class RackResources {
+        String id;
+        double availMem = 0.0;
+        double totalMem = 0.0;
+        double availCpu = 0.0;
+        double totalCpu = 0.0;
+        int freeSlots = 0;
+        int totalSlots = 0;
+        double effectiveResources = 0.0;
+        public RackResources(String id) {
+            this.id = id;
+        }
+
+        @Override
+        public String toString() {
+            return this.id;
+        }
+    }
+
+    /**
+     *
+     * @param topoId
+     * @return a sorted list of racks
+     * Racks are sorted by two criteria. 1) the number executors of the topology that needs to be scheduled is already on the rack in descending order.
+     * The reasoning to sort based on  criterion 1 is so we schedule the rest of a topology on the same rack as the existing executors of the topology.
+     * 2) the subordinate/subservient resource availability percentage of a rack in descending order
+     * We calculate the resource availability percentage by dividing the resource availability on the rack by the resource availability of the entire cluster
+     * By doing this calculation, racks that have exhausted or little of one of the resources mentioned above will be ranked after racks that have more balanced resource availability.
+     * So we will be less likely to pick a rack that have a lot of one resource but a low amount of another.
+     */
+    TreeSet<RackResources> sortRacks(final String topoId) {
+        List<RackResources> racks = new LinkedList<RackResources>();
+        final Map<String, String> nodeIdToRackId = new HashMap<String, String>();
+        double availMemResourcesOverall = 0.0;
+        double totalMemResourcesOverall = 0.0;
+
+        double availCpuResourcesOverall = 0.0;
+        double totalCpuResourcesOverall = 0.0;
+
+        int freeSlotsOverall = 0;
+        int totalSlotsOverall = 0;
+
+        for (Entry<String, List<String>> entry : _clusterInfo.entrySet()) {
+            String rackId = entry.getKey();
+            List<String> nodeIds = entry.getValue();
+            RackResources rack = new RackResources(rackId);
+            racks.add(rack);
+            for (String nodeId : nodeIds) {
+                RAS_Node node = _nodes.getNodeById(this.NodeHostnameToId(nodeId));
+                double availMem = node.getAvailableMemoryResources();
+                double availCpu = node.getAvailableCpuResources();
+                double freeSlots = node.totalSlotsFree();
+                double totalMem = node.getTotalMemoryResources();
+                double totalCpu = node.getTotalCpuResources();
+                double totalSlots = node.totalSlots();
+
+                rack.availMem += availMem;
+                rack.totalMem += totalMem;
+                rack.availCpu += availCpu;
+                rack.totalCpu += totalCpu;
+                rack.freeSlots += freeSlots;
+                rack.totalSlots += totalSlots;
+                nodeIdToRackId.put(nodeId, rack.id);
+
+                availMemResourcesOverall += availMem;
+                availCpuResourcesOverall += availCpu;
+                freeSlotsOverall += freeSlots;
+
+                totalMemResourcesOverall += totalMem;
+                totalCpuResourcesOverall += totalCpu;
+                totalSlotsOverall += totalSlots;
             }
         }
-        return bestCluster;
+
+        LOG.debug("Cluster Overall Avail [ CPU {} MEM {} Slots {} ] Total [ CPU {} MEM {} Slots {} ]",
+                availCpuResourcesOverall, availMemResourcesOverall, freeSlotsOverall, totalCpuResourcesOverall, totalMemResourcesOverall, totalSlotsOverall);
+        for (RackResources rack : racks) {
+            if (availCpuResourcesOverall <= 0.0 || availMemResourcesOverall <= 0.0 || freeSlotsOverall <= 0.0) {
+                rack.effectiveResources = 0.0;
+            } else {
+                rack.effectiveResources = Math.min(Math.min((rack.availCpu / availCpuResourcesOverall), (rack.availMem / availMemResourcesOverall)), ((double) rack.freeSlots / (double) freeSlotsOverall));
+            }
+            LOG.debug("rack: {} Avail [ CPU {}({}%) MEM {}({}%) Slots {}({}%) ] Total [ CPU {} MEM {} Slots {} ] effective resources: {}",
+                    rack.id, rack.availCpu, rack.availCpu/availCpuResourcesOverall * 100.0, rack.availMem, rack.availMem/availMemResourcesOverall * 100.0,
+                    rack.freeSlots, ((double) rack.freeSlots / (double) freeSlotsOverall) * 100.0, rack.totalCpu, rack.totalMem, rack.totalSlots, rack.effectiveResources);
+        }
+
+        TreeSet<RackResources> sortedRacks = new TreeSet<RackResources>(new Comparator<RackResources>() {
+            @Override
+            public int compare(RackResources o1, RackResources o2) {
+
+                int execsScheduledInRack1 = getExecutorsScheduledinRack(topoId, o1.id, nodeIdToRackId).size();
+                int execsScheduledInRack2 = getExecutorsScheduledinRack(topoId, o2.id, nodeIdToRackId).size();
+                if (execsScheduledInRack1 > execsScheduledInRack2) {
+                    return -1;
+                } else if (execsScheduledInRack1 < execsScheduledInRack2) {
+                    return 1;
+                } else {
+                    if (o1.effectiveResources > o2.effectiveResources) {
+                        return -1;
+                    } else if (o1.effectiveResources < o2.effectiveResources) {
+                        return 1;
+                    }
+                    else {
+                        return o1.id.compareTo(o2.id);
+                    }
+                }
+            }
+        });
+        sortedRacks.addAll(racks);
+        LOG.debug("Sorted rack: {}", sortedRacks);
+        return sortedRacks;
     }
 
-    private Double getTotalClusterRes(List<String> cluster) {
-        Double res = 0.0;
-        for (String node : cluster) {
-            res += _nodes.getNodeById(this.NodeHostnameToId(node))
-                    .getAvailableMemoryResources()
-                    + _nodes.getNodeById(this.NodeHostnameToId(node))
-                    .getAvailableCpuResources();
+    private Collection<ExecutorDetails> getExecutorsScheduledinRack(String topoId, String rackId, Map<String, String> nodeToRack) {
+        Collection<ExecutorDetails> execs = new LinkedList<ExecutorDetails>();
+        if (_cluster.getAssignmentById(topoId) != null) {
+            for (Entry<ExecutorDetails, WorkerSlot> entry : _cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
+                String nodeId = entry.getValue().getNodeId();
+                String hostname = idToNode(nodeId).getHostname();
+                ExecutorDetails exec = entry.getKey();
+                if (nodeToRack.get(hostname) != null && nodeToRack.get(hostname).equals(rackId)) {
+                    execs.add(exec);
+                }
+            }
         }
-        return res;
+        return execs;
     }
 
     private Double distToNode(RAS_Node src, RAS_Node dest) {
         if (src.getId().equals(dest.getId())) {
             return 0.0;
-        } else if (this.NodeToCluster(src).equals(this.NodeToCluster(dest))) {
+        } else if (this.nodeToRack(src).equals(this.nodeToRack(dest))) {
             return 0.5;
         } else {
             return 1.0;
         }
     }
 
-    private String NodeToCluster(RAS_Node node) {
+    private String nodeToRack(RAS_Node node) {
         for (Entry<String, List<String>> entry : _clusterInfo
                 .entrySet()) {
             if (entry.getValue().contains(node.getHostname())) {
                 return entry.getKey();
             }
         }
-        LOG.error("Node: {} not found in any clusters", node.getHostname());
+        LOG.error("Node: {} not found in any racks", node.getHostname());
         return null;
     }
-    
+
     private List<RAS_Node> getAvailableNodes() {
         LinkedList<RAS_Node> nodes = new LinkedList<>();
-        for (String clusterId : _clusterInfo.keySet()) {
-            nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
+        for (String rackId : _clusterInfo.keySet()) {
+            nodes.addAll(this.getAvailableNodesFromRack(rackId));
         }
         return nodes;
     }
 
-    private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
+    private List<RAS_Node> getAvailableNodesFromRack(String rackId) {
         List<RAS_Node> retList = new ArrayList<>();
-        for (String node_id : _clusterInfo.get(clus)) {
+        for (String node_id : _clusterInfo.get(rackId)) {
             retList.add(_nodes.getNodeById(this
                     .NodeHostnameToId(node_id)));
         }
         return retList;
     }
 
-    private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
-        List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
+    private List<WorkerSlot> getAvailableWorkersFromRack(String rackId) {
+        List<RAS_Node> nodes = this.getAvailableNodesFromRack(rackId);
         List<WorkerSlot> workers = new LinkedList<>();
         for(RAS_Node node : nodes) {
             workers.addAll(node.getFreeSlots());
@@ -301,8 +417,8 @@ public class DefaultResourceAwareStrategy implements IStrategy {
 
     private List<WorkerSlot> getAvailableWorker() {
         List<WorkerSlot> workers = new LinkedList<>();
-        for (String clusterId : _clusterInfo.keySet()) {
-            workers.addAll(this.getAvailableWorkersFromCluster(clusterId));
+        for (String rackId : _clusterInfo.keySet()) {
+            workers.addAll(this.getAvailableWorkersFromRack(rackId));
         }
         return workers;
     }
@@ -391,7 +507,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             for(ExecutorDetails exec : execs) {
                 totalMem += td.getTotalMemReqTask(exec);
             }
-        } 
+        }
         return totalMem;
     }
 
@@ -424,8 +540,8 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             for(String nodeHostname : clusterEntry.getValue()) {
                 RAS_Node node = this.idToNode(this.NodeHostnameToId(nodeHostname));
                 retVal += "-> Node: " + node.getHostname() + " " + node.getId() + "\n";
-                retVal += "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + "}\n";
-                retVal += "--> Total Resources: {Mem " + node.getTotalMemoryResources() + ", CPU " + node.getTotalCpuResources() + "}\n";
+                retVal += "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + " Slots: " + node.totalSlotsFree() + "}\n";
+                retVal += "--> Total Resources: {Mem " + node.getTotalMemoryResources() + ", CPU " + node.getTotalCpuResources() + " Slots: " + node.totalSlots() + "}\n";
             }
         }
         return retVal;

http://git-wip-us.apache.org/repos/asf/storm/blob/d50a2437/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 9f8b980..054ecc2 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -29,8 +29,6 @@ import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
-import org.apache.storm.testing.TestWordCounter;
-import org.apache.storm.testing.TestWordSpout;
 import org.apache.storm.topology.BoltDeclarer;
 import org.apache.storm.topology.SpoutDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
@@ -44,11 +42,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genExecsAndComps;
 
 public class TestResourceAwareScheduler {
 
@@ -144,6 +141,10 @@ public class TestResourceAwareScheduler {
         config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
         config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
 
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 128.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 0.0);
+
         config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER);
 
         Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
@@ -1236,107 +1237,6 @@ public class TestResourceAwareScheduler {
     }
 
     /**
-     * test if the scheduling logic for the DefaultResourceAwareStrategy is correct
-     */
-    @Test
-    public void testDefaultResourceAwareStrategy() {
-        int spoutParallelism = 1;
-        int boltParallelism = 2;
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("spout", new TestUtilsForResourceAwareScheduler.TestSpout(),
-                spoutParallelism);
-        builder.setBolt("bolt-1", new TestUtilsForResourceAwareScheduler.TestBolt(),
-                boltParallelism).shuffleGrouping("spout");
-        builder.setBolt("bolt-2", new TestUtilsForResourceAwareScheduler.TestBolt(),
-                boltParallelism).shuffleGrouping("bolt-1");
-        builder.setBolt("bolt-3", new TestUtilsForResourceAwareScheduler.TestBolt(),
-                boltParallelism).shuffleGrouping("bolt-2");
-
-        StormTopology stormToplogy = builder.createTopology();
-
-        Config conf = new Config();
-        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
-        Map<String, Number> resourceMap = new HashMap<String, Number>();
-        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 150.0);
-        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1500.0);
-        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
-        conf.putAll(Utils.readDefaultConfig());
-        conf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
-        conf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
-        conf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
-        conf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 50.0);
-        conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 250);
-        conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 250);
-        conf.put(Config.TOPOLOGY_PRIORITY, 0);
-        conf.put(Config.TOPOLOGY_NAME, "testTopology");
-        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
-
-        TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
-                TestUtilsForResourceAwareScheduler.genExecsAndComps(stormToplogy, spoutParallelism, boltParallelism)
-                , this.currentTime);
-
-        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
-        topoMap.put(topo.getId(), topo);
-        Topologies topologies = new Topologies(topoMap);
-        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), conf);
-
-        ResourceAwareScheduler rs = new ResourceAwareScheduler();
-
-        rs.prepare(conf);
-        rs.schedule(topologies, cluster);
-
-        Map<String, List<String>> nodeToComps = new HashMap<String, List<String>>();
-        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : cluster.getAssignments().get("testTopology-id").getExecutorToSlot().entrySet()) {
-            WorkerSlot ws = entry.getValue();
-            ExecutorDetails exec = entry.getKey();
-            if (!nodeToComps.containsKey(ws.getNodeId())) {
-                nodeToComps.put(ws.getNodeId(), new LinkedList<String>());
-            }
-            nodeToComps.get(ws.getNodeId()).add(topo.getExecutorToComponent().get(exec));
-        }
-
-        /**
-         * check for correct scheduling
-         * Since all the resource availabilites on nodes are the same in the beginining
-         * DefaultResourceAwareStrategy can arbitrarily pick one thus we must find if a particular scheduling
-         * exists on a node the the cluster.
-         */
-
-        //one node should have the below scheduling
-        List<String> node1 = new LinkedList<>();
-        node1.add("spout");
-        node1.add("bolt-1");
-        node1.add("bolt-2");
-        Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node1));
-
-        //one node should have the below scheduling
-        List<String> node2 = new LinkedList<>();
-        node2.add("bolt-3");
-        node2.add("bolt-1");
-        node2.add("bolt-2");
-
-        Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node2));
-
-        //one node should have the below scheduling
-        List<String> node3 = new LinkedList<>();
-        node3.add("bolt-3");
-
-        Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node3));
-
-        //three used and one node should be empty
-        Assert.assertEquals("only three nodes should be used", 3, nodeToComps.size());
-    }
-
-    private boolean checkDefaultStrategyScheduling(Map<String, List<String>> nodeToComps, List<String> schedulingToFind) {
-        for (List<String> entry : nodeToComps.values()) {
-            if (schedulingToFind.containsAll(entry) && entry.containsAll(schedulingToFind)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    /**
      * test if free slots on nodes work correctly
      */
     @Test
@@ -1423,7 +1323,7 @@ public class TestResourceAwareScheduler {
         topoMap.put(topo3.getId(), topo3);
 
         Topologies topologies = new Topologies(topoMap);
-        
+
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
 
         rs.prepare(config);
@@ -1472,7 +1372,7 @@ public class TestResourceAwareScheduler {
         StormTopology stormTopology = builder.createTopology();
         TopologyDetails topo = new TopologyDetails("topo-1", config, stormTopology,
                 0,
-                TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology, 5, 5), 0);
+                genExecsAndComps(stormTopology), 0);
 
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d50a2437/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index f21645b..fbc0421 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -41,7 +41,6 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,33 +93,44 @@ public class TestUtilsForResourceAwareScheduler {
     }
 
     public static Map<String, SupervisorDetails> genSupervisors(int numSup, int numPorts, Map resourceMap) {
+        return genSupervisors(numSup, numPorts, 0, resourceMap);
+    }
+
+    public static Map<String, SupervisorDetails> genSupervisors(int numSup, int numPorts, int start, Map resourceMap) {
         Map<String, SupervisorDetails> retList = new HashMap<String, SupervisorDetails>();
-        for (int i = 0; i < numSup; i++) {
+        for (int i = start; i < numSup + start; i++) {
             List<Number> ports = new LinkedList<Number>();
             for (int j = 0; j < numPorts; j++) {
                 ports.add(j);
             }
-            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, resourceMap);
+            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, new HashMap<String, Double>(resourceMap));
             retList.put(sup.getId(), sup);
         }
         return retList;
     }
 
-    public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology, int spoutParallelism, int boltParallelism) {
+    public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology) {
         Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String>();
         int startTask = 0;
         int endTask = 1;
         for (Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+            SpoutSpec spout = entry.getValue();
+            String spoutId = entry.getKey();
+            int spoutParallelism = spout.get_common().get_parallelism_hint();
+
             for (int i = 0; i < spoutParallelism; i++) {
-                retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
+                retMap.put(new ExecutorDetails(startTask, endTask), spoutId);
                 startTask++;
                 endTask++;
             }
         }
 
         for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
+            String boltId = entry.getKey();
+            Bolt bolt = entry.getValue();
+            int boltParallelism = bolt.get_common().get_parallelism_hint();
             for (int i = 0; i < boltParallelism; i++) {
-                retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
+                retMap.put(new ExecutorDetails(startTask, endTask), boltId);
                 startTask++;
                 endTask++;
             }
@@ -139,7 +149,7 @@ public class TestUtilsForResourceAwareScheduler {
         StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
         TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
                 0,
-                genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime);
+                genExecsAndComps(topology), launchTime);
         return topo;
     }
 
@@ -161,6 +171,7 @@ public class TestUtilsForResourceAwareScheduler {
             }
             BoltDeclarer b1 = builder.setBolt("bolt-" + i, new TestBolt(),
                     boltParallelism).shuffleGrouping("spout-" + j);
+            j++;
         }
 
         return builder.createTopology();

http://git-wip-us.apache.org/repos/asf/storm/blob/d50a2437/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
new file mode 100644
index 0000000..77f23aa
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.networktopography.DNSToSwitchMapping;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.INimbus;
+import org.apache.storm.scheduler.SchedulerAssignmentImpl;
+import org.apache.storm.scheduler.SupervisorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RAS_Node;
+import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.RackResources;
+import org.apache.storm.scheduler.resource.SchedulingState;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.User;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+
+public class TestDefaultResourceAwareStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestDefaultResourceAwareStrategy.class);
+
+    private static int currentTime = 1450418597;
+
+    /**
+     * test if the scheduling logic for the DefaultResourceAwareStrategy is correct
+     */
+    @Test
+    public void testDefaultResourceAwareStrategy() {
+        int spoutParallelism = 1;
+        int boltParallelism = 2;
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("spout", new TestUtilsForResourceAwareScheduler.TestSpout(),
+                spoutParallelism);
+        builder.setBolt("bolt-1", new TestUtilsForResourceAwareScheduler.TestBolt(),
+                boltParallelism).shuffleGrouping("spout");
+        builder.setBolt("bolt-2", new TestUtilsForResourceAwareScheduler.TestBolt(),
+                boltParallelism).shuffleGrouping("bolt-1");
+        builder.setBolt("bolt-3", new TestUtilsForResourceAwareScheduler.TestBolt(),
+                boltParallelism).shuffleGrouping("bolt-2");
+
+        StormTopology stormToplogy = builder.createTopology();
+
+        Config conf = new Config();
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 150.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1500.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+        conf.putAll(Utils.readDefaultConfig());
+        conf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        conf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        conf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        conf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 50.0);
+        conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 250);
+        conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 250);
+        conf.put(Config.TOPOLOGY_PRIORITY, 0);
+        conf.put(Config.TOPOLOGY_NAME, "testTopology");
+        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+
+        TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
+                TestUtilsForResourceAwareScheduler.genExecsAndComps(stormToplogy)
+                , this.currentTime);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo.getId(), topo);
+        Topologies topologies = new Topologies(topoMap);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), conf);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(conf);
+        rs.schedule(topologies, cluster);
+
+        Map<String, List<String>> nodeToComps = new HashMap<String, List<String>>();
+        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : cluster.getAssignments().get("testTopology-id").getExecutorToSlot().entrySet()) {
+            WorkerSlot ws = entry.getValue();
+            ExecutorDetails exec = entry.getKey();
+            if (!nodeToComps.containsKey(ws.getNodeId())) {
+                nodeToComps.put(ws.getNodeId(), new LinkedList<String>());
+            }
+            nodeToComps.get(ws.getNodeId()).add(topo.getExecutorToComponent().get(exec));
+        }
+
+        /**
+         * check for correct scheduling
+         * Since all the resource availabilites on nodes are the same in the beginining
+         * DefaultResourceAwareStrategy can arbitrarily pick one thus we must find if a particular scheduling
+         * exists on a node the the cluster.
+         */
+
+        //one node should have the below scheduling
+        List<String> node1 = new LinkedList<>();
+        node1.add("spout");
+        node1.add("bolt-1");
+        node1.add("bolt-2");
+        Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node1));
+
+        //one node should have the below scheduling
+        List<String> node2 = new LinkedList<>();
+        node2.add("bolt-3");
+        node2.add("bolt-1");
+        node2.add("bolt-2");
+
+        Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node2));
+
+        //one node should have the below scheduling
+        List<String> node3 = new LinkedList<>();
+        node3.add("bolt-3");
+
+        Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node3));
+
+        //three used and one node should be empty
+        Assert.assertEquals("only three nodes should be used", 3, nodeToComps.size());
+    }
+
+    private boolean checkDefaultStrategyScheduling(Map<String, List<String>> nodeToComps, List<String> schedulingToFind) {
+        for (List<String> entry : nodeToComps.values()) {
+            if (schedulingToFind.containsAll(entry) && entry.containsAll(schedulingToFind)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Test whether strategy will choose correct rack
+     */
+    @Test
+    public void testMultipleRacks() {
+
+        final Map<String, SupervisorDetails> supMap = new HashMap<String, SupervisorDetails>();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        //generate a rack of supervisors
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 8000.0);
+        final Map<String, SupervisorDetails> supMapRack1 = TestUtilsForResourceAwareScheduler.genSupervisors(10, 4, 0, resourceMap);
+
+        //generate another rack of supervisors with less resources
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 4000.0);
+        final Map<String, SupervisorDetails> supMapRack2 = TestUtilsForResourceAwareScheduler.genSupervisors(10, 4, 10, resourceMap);
+
+        //generate some supervisors that are depleted of one resource
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 0.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 8000.0);
+        final Map<String, SupervisorDetails> supMapRack3 = TestUtilsForResourceAwareScheduler.genSupervisors(10, 4, 20, resourceMap);
+
+        //generate some that has alot of memory but little of cpu
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 10.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 8000.0 *2 + 4000.0);
+        final Map<String, SupervisorDetails> supMapRack4 = TestUtilsForResourceAwareScheduler.genSupervisors(10, 4, 30, resourceMap);
+
+        //generate some that has alot of cpu but little of memory
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0 + 200.0 + 10.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+        final Map<String, SupervisorDetails> supMapRack5 = TestUtilsForResourceAwareScheduler.genSupervisors(10, 4, 40, resourceMap);
+
+
+        supMap.putAll(supMapRack1);
+        supMap.putAll(supMapRack2);
+        supMap.putAll(supMapRack3);
+        supMap.putAll(supMapRack4);
+        supMap.putAll(supMapRack5);
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        //create test DNSToSwitchMapping plugin
+        DNSToSwitchMapping TestNetworkTopographyPlugin = new DNSToSwitchMapping() {
+            @Override
+            public Map<String, String> resolve(List<String> names) {
+                Map<String, String> ret = new HashMap<String, String>();
+                for (SupervisorDetails sup : supMapRack1.values()) {
+                    ret.put(sup.getHost(), "rack-0");
+                }
+                for (SupervisorDetails sup : supMapRack2.values()) {
+                    ret.put(sup.getHost(), "rack-1");
+                }
+                for (SupervisorDetails sup : supMapRack3.values()) {
+                    ret.put(sup.getHost(), "rack-2");
+                }
+                for (SupervisorDetails sup : supMapRack4.values()) {
+                    ret.put(sup.getHost(), "rack-3");
+                }
+                for (SupervisorDetails sup : supMapRack5.values()) {
+                    ret.put(sup.getHost(), "rack-4");
+                }
+                return ret;
+            }
+        };
+
+        List<String> supHostnames = new LinkedList<>();
+        for (SupervisorDetails sup : supMap.values()) {
+            supHostnames.add(sup.getHost());
+        }
+        Map<String, List<String>> rackToNodes = new HashMap<>();
+        Map<String, String> resolvedSuperVisors =  TestNetworkTopographyPlugin.resolve(supHostnames);
+        for (Map.Entry<String, String> entry : resolvedSuperVisors.entrySet()) {
+            String hostName = entry.getKey();
+            String rack = entry.getValue();
+            List<String> nodesForRack = rackToNodes.get(rack);
+            if (nodesForRack == null) {
+                nodesForRack = new ArrayList<String>();
+                rackToNodes.put(rack, nodesForRack);
+            }
+            nodesForRack.add(hostName);
+        }
+        cluster.setNetworkTopography(rackToNodes);
+
+        //generate topologies
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, currentTime - 2, 10);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 8, 0, 2, 0, currentTime - 2, 10);
+
+        topoMap.put(topo1.getId(), topo1);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        DefaultResourceAwareStrategy rs = new DefaultResourceAwareStrategy();
+
+        rs.prepare(new SchedulingState(new HashMap<String, User>(), cluster, topologies, config));
+        TreeSet<RackResources> sortedRacks= rs.sortRacks(topo1.getId());
+
+        Assert.assertEquals("# of racks sorted", 5, sortedRacks.size());
+        Iterator<RackResources> it = sortedRacks.iterator();
+        // Ranked first since rack-0 has the most balanced set of resources
+        Assert.assertEquals("rack-0 should be ordered first", "rack-0", it.next().id);
+        // Ranked second since rack-1 has a balanced set of resources but less than rack-0
+        Assert.assertEquals("rack-1 should be ordered second", "rack-1", it.next().id);
+        // Ranked third since rack-4 has a lot of cpu but not a lot of memory
+        Assert.assertEquals("rack-4 should be ordered third", "rack-4", it.next().id);
+        // Ranked fourth since rack-3 has alot of memory but not cpu
+        Assert.assertEquals("rack-3 should be ordered fourth", "rack-3", it.next().id);
+        //Ranked last since rack-2 has not cpu resources
+        Assert.assertEquals("rack-2 should be ordered fifth", "rack-2", it.next().id);
+
+        SchedulingResult schedulingResult = rs.schedule(topo1);
+        for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry : schedulingResult.getSchedulingResultMap().entrySet()) {
+            WorkerSlot ws = entry.getKey();
+            Collection<ExecutorDetails> execs = entry.getValue();
+            //make sure all workers on scheduled in rack-0
+            Assert.assertEquals("assert worker scheduled on rack-0", "rack-0", resolvedSuperVisors.get(rs.idToNode(ws.getNodeId()).getHostname()));
+            // make actual assignments
+            cluster.assign(ws, topo1.getId(), execs);
+        }
+        Assert.assertEquals("All executors in topo-1 scheduled", 0, cluster.getUnassignedExecutors(topo1).size());
+
+        //Test if topology is already partially scheduled on one rack
+
+        topoMap.put(topo2.getId(), topo2);
+        topologies = new Topologies(topoMap);
+        RAS_Nodes nodes = new RAS_Nodes(cluster, topologies);
+        Iterator<ExecutorDetails> executorIterator = topo2.getExecutors().iterator();
+        List<String> nodeHostnames = rackToNodes.get("rack-1");
+        for (int i=0 ; i< topo2.getExecutors().size()/2; i++) {
+            String nodeHostname = nodeHostnames.get(i % nodeHostnames.size());
+            RAS_Node node = rs.idToNode(rs.NodeHostnameToId(nodeHostname));
+            WorkerSlot targetSlot = node.getFreeSlots().iterator().next();
+            ExecutorDetails targetExec = executorIterator.next();
+            // to keep track of free slots
+            node.assign(targetSlot, topo2, Arrays.asList(targetExec));
+            // to actually assign
+            cluster.assign(targetSlot, topo2.getId(), Arrays.asList(targetExec));
+        }
+
+        topologies.getById(topo2.getId()).getTotalMemoryResourceList();
+
+        rs = new DefaultResourceAwareStrategy();
+        rs.prepare(new SchedulingState(new HashMap<String, User>(), cluster, topologies, config));
+        // schedule topo2
+        schedulingResult = rs.schedule(topo2);
+
+        // checking assignments
+        for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry : schedulingResult.getSchedulingResultMap().entrySet()) {
+            WorkerSlot ws = entry.getKey();
+            Collection<ExecutorDetails> execs = entry.getValue();
+            //make sure all workers on scheduled in rack-1
+            Assert.assertEquals("assert worker scheduled on rack-1", "rack-1", resolvedSuperVisors.get(rs.idToNode(ws.getNodeId()).getHostname()));
+            // make actual assignments
+            cluster.assign(ws, topo2.getId(), execs);
+        }
+        Assert.assertEquals("All executors in topo-2 scheduled", 0, cluster.getUnassignedExecutors(topo1).size());
+    }
+}


[3/3] storm git commit: add STORM-1766 to CHANGELOG

Posted by ka...@apache.org.
add STORM-1766 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/560746f8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/560746f8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/560746f8

Branch: refs/heads/1.x-branch
Commit: 560746f8ee4c3e570fa7df615829656f44459306
Parents: 790308d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Aug 14 23:45:44 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Aug 14 23:45:44 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/560746f8/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b190cc3..dca6f9d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-1766: A better algorithm server rack selection for RAS
  * STORM-1913: Additions and Improvements for Trident RAS API
  * STORM-2037: debug operation should be whitelisted in SimpleAclAuthorizer.
  * STORM-2023: Add calcite-core to dependency of storm-sql-runtime


[2/3] storm git commit: Merge branch '1.x-STORM-1766' of https://github.com/jerrypeng/storm into STORM-1766-1.x

Posted by ka...@apache.org.
Merge branch '1.x-STORM-1766' of https://github.com/jerrypeng/storm into STORM-1766-1.x


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/790308d2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/790308d2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/790308d2

Branch: refs/heads/1.x-branch
Commit: 790308d204584dbb12ce2dfd49bb61f55ba1c6c2
Parents: 66f5f68 d50a243
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Aug 14 23:39:03 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Aug 14 23:39:03 2016 +0900

----------------------------------------------------------------------
 .../jvm/org/apache/storm/scheduler/Cluster.java |   7 +
 .../storm/scheduler/resource/RAS_Node.java      |  11 +-
 .../DefaultResourceAwareStrategy.java           | 228 +++++++++----
 .../resource/TestResourceAwareScheduler.java    | 114 +------
 .../TestUtilsForResourceAwareScheduler.java     |  24 +-
 .../TestDefaultResourceAwareStrategy.java       | 333 +++++++++++++++++++
 6 files changed, 541 insertions(+), 176 deletions(-)
----------------------------------------------------------------------