You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/31 02:07:03 UTC

[04/20] storm git commit: STORM-2497: Let Supervisor enforce memory and add in support for shared memory regions

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java
new file mode 100644
index 0000000..703f4b8
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.EnumSet;
+
+public enum SchedulingStatus {
+    SUCCESS,
+    FAIL_NOT_ENOUGH_RESOURCES,
+    FAIL_INVALID_TOPOLOGY,
+    FAIL_OTHER;
+
+    public static EnumSet<SchedulingStatus> success = EnumSet.of(SUCCESS);
+    public static EnumSet<SchedulingStatus> failure = EnumSet.of(FAIL_INVALID_TOPOLOGY, FAIL_NOT_ENOUGH_RESOURCES,
+        FAIL_OTHER);
+
+    public static boolean isStatusSuccess(SchedulingStatus status) {
+        return success.contains(status);
+    }
+
+    public static boolean isStatusFailure(SchedulingStatus status) {
+        return failure.contains(status);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
new file mode 100644
index 0000000..4850ebb
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.storm.daemon.nimbus.TopologyResources;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.TopologyDetails;
+
+public class User {
+    private String userId;
+
+    //Topologies that was deemed to be invalid
+    private final Set<TopologyDetails> unsuccess = new HashSet<>();
+
+    private final double cpuGuarantee;
+    private final double memoryGuarantee;
+
+    public User(String userId) {
+        this(userId, 0, 0);
+    }
+
+    public User(String userId, Map<String, Double> resourcePool) {
+        this(
+            userId,
+            resourcePool == null ? 0.0 : resourcePool.getOrDefault("cpu", 0.0),
+            resourcePool == null ? 0.0 : resourcePool.getOrDefault("memory", 0.0));
+    }
+
+    private User(String userId, double cpuGuarantee, double memoryGuarantee) {
+        this.userId = userId;
+        this.cpuGuarantee = cpuGuarantee;
+        this.memoryGuarantee = memoryGuarantee;
+    }
+
+    public String getId() {
+        return this.userId;
+    }
+
+    public TreeSet<TopologyDetails> getRunningTopologies(ISchedulingState cluster) {
+        TreeSet<TopologyDetails> ret =
+            new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+        for (TopologyDetails td : cluster.getTopologies().getTopologiesOwnedBy(userId)) {
+            if (!cluster.needsSchedulingRas(td)) {
+                ret.add(td);
+            }
+        }
+        return ret;
+    }
+
+    public TreeSet<TopologyDetails> getPendingTopologies(ISchedulingState cluster) {
+        TreeSet<TopologyDetails> ret =
+            new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+        for (TopologyDetails td : cluster.getTopologies().getTopologiesOwnedBy(userId)) {
+            if (cluster.needsSchedulingRas(td) && !unsuccess.contains(td)) {
+                ret.add(td);
+            }
+        }
+        return ret;
+    }
+
+    public void markTopoUnsuccess(TopologyDetails topo, Cluster cluster) {
+        unsuccess.add(topo);
+        if (cluster != null) {
+            cluster.setStatus(topo.getId(), "Scheduling Attempted but topology is invalid");
+        }
+    }
+
+    public void markTopoUnsuccess(TopologyDetails topo) {
+        this.markTopoUnsuccess(topo, null);
+    }
+
+    public double getResourcePoolAverageUtilization(ISchedulingState cluster) {
+        double cpuResourcePoolUtilization = getCpuResourcePoolUtilization(cluster);
+        double memoryResourcePoolUtilization = getMemoryResourcePoolUtilization(cluster);
+
+        //cannot be (cpuResourcePoolUtilization + memoryResourcePoolUtilization)/2
+        //since memoryResourcePoolUtilization or cpuResourcePoolUtilization can be Double.MAX_VALUE
+        //Should not return infinity in that case
+        return ((cpuResourcePoolUtilization) / 2.0) + ((memoryResourcePoolUtilization) / 2.0);
+    }
+
+    public double getCpuResourcePoolUtilization(ISchedulingState cluster) {
+        if (cpuGuarantee == 0.0) {
+            return Double.MAX_VALUE;
+        }
+        return getCpuResourceUsedByUser(cluster) / cpuGuarantee;
+    }
+
+    public double getMemoryResourcePoolUtilization(ISchedulingState cluster) {
+        if (memoryGuarantee == 0.0) {
+            return Double.MAX_VALUE;
+        }
+        return getMemoryResourceUsedByUser(cluster) / memoryGuarantee;
+    }
+
+    public double getCpuResourceUsedByUser(ISchedulingState cluster) {
+        double sum = 0.0;
+        for (TopologyDetails td : cluster.getTopologies().getTopologiesOwnedBy(userId)) {
+            SchedulerAssignment assignment = cluster.getAssignmentById(td.getId());
+            if (assignment != null) {
+                TopologyResources tr = new TopologyResources(td, assignment);
+                sum += tr.getAssignedCpu();
+            }
+        }
+        return sum;
+    }
+
+    public double getMemoryResourceUsedByUser(ISchedulingState cluster) {
+        double sum = 0.0;
+        for (TopologyDetails td : cluster.getTopologies().getTopologiesOwnedBy(userId)) {
+            SchedulerAssignment assignment = cluster.getAssignmentById(td.getId());
+            if (assignment != null) {
+                TopologyResources tr = new TopologyResources(td, assignment);
+                sum += tr.getAssignedMemOnHeap() + tr.getAssignedMemOffHeap();
+            }
+        }
+        return sum;
+    }
+
+    public double getMemoryResourceGuaranteed() {
+        return memoryGuarantee;
+    }
+
+    public double getCpuResourceGuaranteed() {
+        return cpuGuarantee;
+    }
+
+    public TopologyDetails getNextTopologyToSchedule(ISchedulingState cluster) {
+        for (TopologyDetails topo : getPendingTopologies(cluster)) {
+            return topo;
+        }
+        return null;
+    }
+
+    public boolean hasTopologyNeedSchedule(ISchedulingState cluster) {
+        return (!this.getPendingTopologies(cluster).isEmpty());
+    }
+
+    public TopologyDetails getRunningTopologyWithLowestPriority(ISchedulingState cluster) {
+        TreeSet<TopologyDetails> queue = getRunningTopologies(cluster);
+        if (queue.isEmpty()) {
+            return null;
+        }
+        return queue.last();
+    }
+
+    @Override
+    public int hashCode() {
+        return this.userId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof User)) {
+            return false;
+        }
+        return this.getId().equals(((User) other).getId());
+    }
+
+    @Override
+    public String toString() {
+        return this.userId;
+    }
+
+    public String getDetailedInfo() {
+        String ret = "\nUser: " + userId;
+        ret += "\n - " + " Resource Pool: " + cpuGuarantee + "% " + memoryGuarantee + "MB ";
+        ret += "\n - " + " Unsuccess Queue: " + unsuccess + " size: " + unsuccess.size();
+        return ret;
+    }
+
+    public static String getResourcePoolAverageUtilizationForUsers(
+        Collection<User> users, Cluster cluster) {
+        String ret = "";
+        for (User user : users) {
+            ret += user.getId() + " - " + user.getResourcePoolAverageUtilization(cluster) + " ";
+        }
+        return ret;
+    }
+
+    /**
+     * Comparator that sorts topologies by priority and then by submission time First sort by Topology
+     * Priority, if there is a tie for topology priority, topology uptime is used to sort.
+     */
+    static class PQsortByPriorityAndSubmittionTime implements Comparator<TopologyDetails> {
+
+        public int compare(TopologyDetails topo1, TopologyDetails topo2) {
+            if (topo1.getTopologyPriority() > topo2.getTopologyPriority()) {
+                return 1;
+            } else if (topo1.getTopologyPriority() < topo2.getTopologyPriority()) {
+                return -1;
+            } else {
+                if (topo1.getUpTime() > topo2.getUpTime()) {
+                    return -1;
+                } else if (topo1.getUpTime() < topo2.getUpTime()) {
+                    return 1;
+                } else {
+                    return topo1.getId().compareTo(topo2.getId());
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
index 182017b..3abc65e 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -18,18 +18,17 @@
 
 package org.apache.storm.scheduler.resource.strategies.eviction;
 
+import java.util.Collection;
+import java.util.Map;
+
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.RAS_Nodes;
-import org.apache.storm.scheduler.resource.SchedulingState;
 import org.apache.storm.scheduler.resource.User;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.Map;
-
 public class DefaultEvictionStrategy implements IEvictionStrategy {
     private static final Logger LOG = LoggerFactory
             .getLogger(DefaultEvictionStrategy.class);
@@ -39,49 +38,35 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
     private RAS_Nodes nodes;
 
     @Override
-    public void prepare(SchedulingState schedulingState) {
-        this.cluster = schedulingState.cluster;
-        this.userMap = schedulingState.userMap;
-        this.nodes = schedulingState.nodes;
-    }
-
-    @Override
-    public boolean makeSpaceForTopo(TopologyDetails td) {
+    public boolean makeSpaceForTopo(TopologyDetails td, Cluster schedulingState, Map<String, User> userMap) {
+        this.cluster = schedulingState;
+        this.userMap = userMap;
+        this.nodes = new RAS_Nodes(schedulingState);
         LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
         User submitter = this.userMap.get(td.getTopologySubmitter());
-        if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null
-                || submitter.getCPUResourceGuaranteed() == 0.0 || submitter.getMemoryResourceGuaranteed() == 0.0) {
+        if (submitter.getCpuResourceGuaranteed() == 0.0 || submitter.getMemoryResourceGuaranteed() == 0.0) {
             return false;
         }
 
-        double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCPUResourceGuaranteed();
-        double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
+        double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCpuResourceGuaranteed();
+        double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap())
+            / submitter.getMemoryResourceGuaranteed();
 
         User evictUser = this.findUserWithHighestAverageResourceUtilAboveGuarantee();
         //check if user has enough resource under his or her resource guarantee to schedule topology
-        if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
+        if ((1.0 - submitter.getCpuResourcePoolUtilization(schedulingState)) >= cpuNeeded
+            && (1.0 - submitter.getMemoryResourcePoolUtilization(schedulingState)) >= memoryNeeded) {
             if (evictUser != null) {
-                TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
-                LOG.debug("Running Topology {} from user {} is still within user's resource guarantee thus, POTENTIALLY evicting Topology {} from user {} since:" +
-                                "\n(1.0 - submitter.getCPUResourcePoolUtilization()) = {} >= cpuNeeded = {}" +
-                                "\nand" +
-                                "\n(1.0 - submitter.getMemoryResourcePoolUtilization()) = {} >= memoryNeeded = {}"
-                        ,td, submitter, topologyEvict, evictUser, (1.0 - submitter.getCPUResourcePoolUtilization())
-                        , cpuNeeded, (1.0 - submitter.getMemoryResourcePoolUtilization()), memoryNeeded);
+                TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority(schedulingState);
                 evictTopology(topologyEvict);
                 return true;
             }
         } else {
             if (evictUser != null) {
-                if ((evictUser.getResourcePoolAverageUtilization() - 1.0) > (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0))) {
-                    TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
-                    LOG.debug("POTENTIALLY Evicting Topology {} from user {} since:" +
-                                    "\n((evictUser.getResourcePoolAverageUtilization() - 1.0) = {}" +
-                                    "\n(cpuNeeded + memoryNeeded) / 2) = {} and (submitter.getResourcePoolAverageUtilization() - 1.0)) = {} Thus," +
-                                    "\n(evictUser.getResourcePoolAverageUtilization() - 1.0) = {} > (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0)) = {}"
-                            ,topologyEvict, evictUser, (evictUser.getResourcePoolAverageUtilization() - 1.0), ((cpuNeeded + memoryNeeded) / 2)
-                            , (submitter.getResourcePoolAverageUtilization() - 1.0), (evictUser.getResourcePoolAverageUtilization() - 1.0)
-                            , (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0)));
+                if ((evictUser.getResourcePoolAverageUtilization(schedulingState) - 1.0)
+                    > (((cpuNeeded + memoryNeeded) / 2)
+                        + (submitter.getResourcePoolAverageUtilization(schedulingState) - 1.0))) {
+                    TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority(schedulingState);
                     evictTopology(topologyEvict);
                     return true;
                 }
@@ -90,12 +75,12 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
         //See if there is a lower priority topology that can be evicted from the current user
         //topologies should already be sorted in order of increasing priority.
         //Thus, topology at the front of the queue has the lowest priority
-        for (TopologyDetails topo : submitter.getTopologiesRunning()) {
+        for (TopologyDetails topo : submitter.getRunningTopologies(schedulingState)) {
             //check to if there is a topology with a lower priority we can evict
             if (topo.getTopologyPriority() > td.getTopologyPriority()) {
-                LOG.debug("POTENTIALLY Evicting Topology {} from user {} (itself) since topology {} has a lower priority than topology {}"
-                        , topo, submitter, topo, td);
-                        evictTopology(topo);
+                LOG.debug("POTENTIALLY Evicting Topology {} from user {} (itself) since topology {} has a lower "
+                        + "priority than topology {}", topo, submitter, topo, td);
+                evictTopology(topo);
                 return true;
             }
         }
@@ -104,19 +89,18 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
 
     private void evictTopology(TopologyDetails topologyEvict) {
         Collection<WorkerSlot> workersToEvict = this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId());
-        User submitter = this.userMap.get(topologyEvict.getTopologySubmitter());
 
-        LOG.info("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict, topologyEvict.getTopologySubmitter());
+        LOG.info("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
+            topologyEvict.getTopologySubmitter());
         this.nodes.freeSlots(workersToEvict);
-        submitter.moveTopoFromRunningToPending(topologyEvict, this.cluster);
     }
 
     private User findUserWithHighestAverageResourceUtilAboveGuarantee() {
         double most = 0.0;
         User mostOverUser = null;
         for (User user : this.userMap.values()) {
-            double over = user.getResourcePoolAverageUtilization() - 1.0;
-            if ((over > most) && (!user.getTopologiesRunning().isEmpty())) {
+            double over = user.getResourcePoolAverageUtilization(cluster) - 1.0;
+            if ((over > most) && (!user.getRunningTopologies(cluster).isEmpty())) {
                 most = over;
                 mostOverUser = user;
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
index 9499424..2947f7a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
@@ -18,25 +18,20 @@
 
 package org.apache.storm.scheduler.resource.strategies.eviction;
 
+import java.util.Map;
+
+import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.SchedulingState;
+import org.apache.storm.scheduler.resource.User;
 
 public interface IEvictionStrategy {
-
-    /**
-     * Initialization
-     */
-    public void prepare(SchedulingState schedulingState);
-
     /**
      * This method when invoked should attempt to make space on the cluster so that the topology specified can be scheduled
      * @param td the topology to make space for
      * @return return true to indicate that space has been made for topology and try schedule topology td again.
-     * Return false to inidcate that no space could be made for the topology on the cluster and the scheduler should give up
+     * Return false to indicate that no space could be made for the topology on the cluster and the scheduler should give up
      * trying to schedule the topology for this round of scheduling.  This method will be invoked until the topology indicated
      * could be scheduled or the method returns false
      */
-    public boolean makeSpaceForTopo(TopologyDetails td);
-
-
+    public boolean makeSpaceForTopo(TopologyDetails td, Cluster schedulingState, Map<String, User> userMap);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
index e3109d5..b6d8ada 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -18,55 +18,46 @@
 
 package org.apache.storm.scheduler.resource.strategies.priority;
 
-import org.apache.storm.scheduler.Cluster;
+import java.util.Map;
+
+import org.apache.storm.scheduler.ISchedulingState;
 import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.SchedulingState;
 import org.apache.storm.scheduler.resource.User;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStrategy {
     private static final Logger LOG = LoggerFactory
             .getLogger(DefaultSchedulingPriorityStrategy.class);
 
-    private Cluster cluster;
-    private Map<String, User> userMap;
-
     @Override
-    public void prepare(SchedulingState schedulingState) {
-        this.cluster = schedulingState.cluster;
-        this.userMap = schedulingState.userMap;
-    }
-
-    @Override
-    public TopologyDetails getNextTopologyToSchedule() {
-        User nextUser = this.getNextUser();
+    public TopologyDetails getNextTopologyToSchedule(ISchedulingState schedulingState, Map<String, User> userMap) {
+        User nextUser = getNextUser(schedulingState, userMap);
         if (nextUser == null) {
             return null;
         }
-        return nextUser.getNextTopologyToSchedule();
+        return nextUser.getNextTopologyToSchedule(schedulingState);
     }
 
-    public User getNextUser() {
-        Double least = Double.POSITIVE_INFINITY;
+    public User getNextUser(ISchedulingState cluster, Map<String, User> userMap) {
+        double least = Double.POSITIVE_INFINITY;
         User ret = null;
-        for (User user : this.userMap.values()) {
-            if (user.hasTopologyNeedSchedule()) {
-                Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
+        final double totalCpu = cluster.getClusterTotalCpuResource();
+        final double totalMem = cluster.getClusterTotalMemoryResource();
+        for (User user : userMap.values()) {
+            if (user.hasTopologyNeedSchedule(cluster)) {
+                double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization(cluster);
                 if (least > userResourcePoolAverageUtilization) {
                     ret = user;
                     least = userResourcePoolAverageUtilization;
-                }
-                // if ResourcePoolAverageUtilization is equal to the user that is being compared
-                else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
-                    double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
-                    double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
+                } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
+                    // if ResourcePoolAverageUtilization is equal to the user that is being compared
+                    double currentCpuPercentage = ret.getCpuResourceGuaranteed() / totalCpu;
+                    double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / totalMem;
                     double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
 
-                    double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
-                    double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
+                    double userCpuPercentage = user.getCpuResourceGuaranteed() / totalCpu;
+                    double userMemoryPercentage = user.getMemoryResourceGuaranteed() / totalMem;
                     double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
                     if (userAvgPercentage > currentAvgPercentage) {
                         ret = user;

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
index ffb463f..8654bb3 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
@@ -18,19 +18,16 @@
 
 package org.apache.storm.scheduler.resource.strategies.priority;
 
+import java.util.Map;
+
+import org.apache.storm.scheduler.ISchedulingState;
 import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.SchedulingState;
+import org.apache.storm.scheduler.resource.User;
 
 public interface ISchedulingPriorityStrategy {
-
-    /**
-     * initializes
-     */
-    public void prepare(SchedulingState schedulingState);
-
     /**
      * Gets the next topology to schedule
      * @return return the next topology to schedule.  If there is no topologies left to schedule, return null
      */
-    public TopologyDetails getNextTopologyToSchedule();
+    public TopologyDetails getNextTopologyToSchedule(ISchedulingState schedulingState, Map<String, User> userMap);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
new file mode 100644
index 0000000..650f3df
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -0,0 +1,741 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.storm.generated.ComponentType;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Component;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RAS_Node;
+import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultResourceAwareStrategy implements IStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
+    private Cluster cluster;
+    private Map<String, List<String>> networkTopography;
+    private RAS_Nodes nodes;
+
+    private TreeSet<ObjectResources> sortedRacks = null;
+    private Map<String, TreeSet<ObjectResources>> rackIdToSortedNodes = new HashMap<>();
+
+    @VisibleForTesting
+    void prepare(Cluster cluster) {
+        this.cluster = cluster;
+        nodes = new RAS_Nodes(cluster);
+        networkTopography = cluster.getNetworkTopography();
+        logClusterInfo();
+    }
+
+    @Override
+    public void prepare(Map<String, Object> config) {
+        //NOOP
+    }
+
+    @Override
+    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
+        prepare(cluster);
+        if (nodes.getNodes().size() <= 0) {
+            LOG.warn("No available nodes to schedule tasks on!");
+            return SchedulingResult.failure(
+                SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
+        }
+        Collection<ExecutorDetails> unassignedExecutors =
+            new HashSet<>(this.cluster.getUnassignedExecutors(td));
+        LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
+        Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
+        List<Component> spouts = this.getSpouts(td);
+
+        if (spouts.size() == 0) {
+            LOG.error("Cannot find a Spout!");
+            return SchedulingResult.failure(
+                SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
+        }
+
+        //order executors to be scheduled
+        List<ExecutorDetails> orderedExecutors = orderExecutors(td, unassignedExecutors);
+
+        Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
+
+        for (ExecutorDetails exec : orderedExecutors) {
+            LOG.debug(
+                "Attempting to schedule: {} of component {}[ REQ {} ]",
+                exec,
+                td.getExecutorToComponent().get(exec),
+                td.getTaskResourceReqList(exec));
+            scheduleExecutor(exec, td, scheduledTasks);
+        }
+
+        executorsNotScheduled.removeAll(scheduledTasks);
+        LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
+        // schedule left over system tasks
+        for (ExecutorDetails exec : executorsNotScheduled) {
+            scheduleExecutor(exec, td, scheduledTasks);
+        }
+
+        SchedulingResult result;
+        executorsNotScheduled.removeAll(scheduledTasks);
+        if (executorsNotScheduled.size() > 0) {
+            LOG.error("Not all executors successfully scheduled: {}", executorsNotScheduled);
+            result =
+                SchedulingResult.failure(
+                    SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
+                    (td.getExecutors().size() - unassignedExecutors.size())
+                        + "/"
+                        + td.getExecutors().size()
+                        + " executors scheduled");
+        } else {
+            LOG.debug("All resources successfully scheduled!");
+            result = SchedulingResult.success("Fully Scheduled by DefaultResourceAwareStrategy");
+        }
+        return result;
+    }
+
+    /**
+     * Schedule executor exec from topology td.
+     *
+     * @param exec the executor to schedule
+     * @param td the topology executor exec is a part of
+     * @param scheduledTasks executors that have been scheduled
+     */
+    private void scheduleExecutor(
+        ExecutorDetails exec, TopologyDetails td, Collection<ExecutorDetails> scheduledTasks) {
+        WorkerSlot targetSlot = findWorkerForExec(exec, td);
+        if (targetSlot != null) {
+            RAS_Node targetNode = idToNode(targetSlot.getNodeId());
+            targetNode.assignSingleExecutor(targetSlot, exec, td);
+            scheduledTasks.add(exec);
+            LOG.debug(
+                "TASK {} assigned to Node: {} avail [ mem: {} cpu: {} ] total [ mem: {} cpu: {} ] on "
+                    + "slot: {} on Rack: {}",
+                exec,
+                targetNode.getHostname(),
+                targetNode.getAvailableMemoryResources(),
+                targetNode.getAvailableCpuResources(),
+                targetNode.getTotalMemoryResources(),
+                targetNode.getTotalCpuResources(),
+                targetSlot,
+                nodeToRack(targetNode));
+        } else {
+            LOG.error("Not Enough Resources to schedule Task {}", exec);
+        }
+    }
+
+    /**
+     * Find a worker to schedule executor exec on
+     *
+     * @param exec the executor to schedule
+     * @param td the topology that the executor is a part of
+     * @return a worker to assign exec on. Returns null if a worker cannot be successfully found in cluster
+     */
+    private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td) {
+        WorkerSlot ws = null;
+
+        // iterate through an ordered list of all racks available to make sure we cannot schedule
+        // the first executor in any rack before we "give up" the list is ordered in decreasing order
+        // of effective resources. With the rack in the front of the list having the most effective
+        // resources.
+        if (sortedRacks == null) {
+            sortedRacks = sortRacks(td.getId());
+        }
+
+        for (ObjectResources rack : sortedRacks) {
+            ws = this.getBestWorker(exec, td, rack.id);
+            if (ws != null) {
+                LOG.debug("best rack: {}", rack.id);
+                break;
+            }
+        }
+        return ws;
+    }
+
+    /**
+     * Get the best worker to assign executor exec on a rack
+     *
+     * @param exec the executor to schedule
+     * @param td the topology that the executor is a part of
+     * @param rackId the rack id of the rack to find a worker on
+     * @return a worker to assign executor exec to. Returns null if a worker cannot be successfully found on rack with
+     *     rackId
+     */
+    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String rackId) {
+        if (!rackIdToSortedNodes.containsKey(rackId)) {
+            rackIdToSortedNodes.put(
+                rackId, sortNodes(this.getAvailableNodesFromRack(rackId), rackId, td.getId()));
+        }
+
+        TreeSet<ObjectResources> sortedNodes = rackIdToSortedNodes.get(rackId);
+
+        for (ObjectResources nodeResources : sortedNodes) {
+            RAS_Node node = nodes.getNodeById(nodeResources.id);
+            for (WorkerSlot ws : node.getSlotsAvailbleTo(td)) {
+                if (node.wouldFit(ws, exec, td)) {
+                    return ws;
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * interface for calculating the number of existing executors scheduled on a object (rack or
+     * node).
+     */
+    private interface ExistingScheduleFunc {
+        int getNumExistingSchedule(String objectId);
+    }
+
+    /**
+     * a class to contain individual object resources as well as cumulative stats.
+     */
+    static class AllResources {
+        List<ObjectResources> objectResources = new LinkedList<ObjectResources>();
+        double availMemResourcesOverall = 0.0;
+        double totalMemResourcesOverall = 0.0;
+        double availCpuResourcesOverall = 0.0;
+        double totalCpuResourcesOverall = 0.0;
+        String identifier;
+
+        public AllResources(String identifier) {
+            this.identifier = identifier;
+        }
+    }
+
+    /**
+     * class to keep track of resources on a rack or node.
+     */
+    static class ObjectResources {
+        String id;
+        double availMem = 0.0;
+        double totalMem = 0.0;
+        double availCpu = 0.0;
+        double totalCpu = 0.0;
+        double effectiveResources = 0.0;
+
+        public ObjectResources(String id) {
+            this.id = id;
+        }
+
+        @Override
+        public String toString() {
+            return this.id;
+        }
+    }
+
+    /**
+     * Nodes are sorted by two criteria.
+     *
+     * <p>1) the number executors of the topology that needs to be scheduled is already on the node in
+     * descending order. The reasoning to sort based on criterion 1 is so we schedule the rest of a
+     * topology on the same node as the existing executors of the topology.
+     *
+     * <p>2) the subordinate/subservient resource availability percentage of a node in descending
+     * order We calculate the resource availability percentage by dividing the resource availability
+     * that have exhausted or little of one of the resources mentioned above will be ranked after
+     * on the node by the resource availability of the entire rack By doing this calculation, nodes
+     * nodes that have more balanced resource availability. So we will be less likely to pick a node
+     * that have a lot of one resource but a low amount of another.
+     *
+     * @param availNodes a list of all the nodes we want to sort
+     * @param rackId the rack id availNodes are a part of
+     * @param topoId the topology that we are trying to schedule
+     * @return a sorted list of nodes.
+     */
+    private TreeSet<ObjectResources> sortNodes(
+        List<RAS_Node> availNodes, String rackId, final String topoId) {
+        AllResources allResources = new AllResources("RACK");
+        List<ObjectResources> nodes = allResources.objectResources;
+
+        for (RAS_Node rasNode : availNodes) {
+            String nodeId = rasNode.getId();
+            ObjectResources node = new ObjectResources(nodeId);
+
+            double availMem = rasNode.getAvailableMemoryResources();
+            node.availMem = availMem;
+            double availCpu = rasNode.getAvailableCpuResources();
+            node.availCpu = availCpu;
+            double totalMem = rasNode.getTotalMemoryResources();
+            node.totalMem = totalMem;
+            double totalCpu = rasNode.getTotalCpuResources();
+            node.totalCpu = totalCpu;
+
+            nodes.add(node);
+
+            allResources.availMemResourcesOverall += availMem;
+            allResources.availCpuResourcesOverall += availCpu;
+
+            allResources.totalMemResourcesOverall += totalMem;
+            allResources.totalCpuResourcesOverall += totalCpu;
+        }
+
+        LOG.debug(
+            "Rack {}: Overall Avail [ CPU {} MEM {} ] Total [ CPU {} MEM {} ]",
+            rackId,
+            allResources.availCpuResourcesOverall,
+            allResources.availMemResourcesOverall,
+            allResources.totalCpuResourcesOverall,
+            allResources.totalMemResourcesOverall);
+
+        return sortObjectResources(
+            allResources,
+            new ExistingScheduleFunc() {
+                @Override
+                public int getNumExistingSchedule(String objectId) {
+
+                    //Get execs already assigned in rack
+                    Collection<ExecutorDetails> execs = new LinkedList<ExecutorDetails>();
+                    if (cluster.getAssignmentById(topoId) != null) {
+                        for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
+                            cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
+                            WorkerSlot workerSlot = entry.getValue();
+                            ExecutorDetails exec = entry.getKey();
+                            if (workerSlot.getNodeId().equals(objectId)) {
+                                execs.add(exec);
+                            }
+                        }
+                    }
+                    return execs.size();
+                }
+            });
+    }
+
+    /**
+     * Racks are sorted by two criteria.
+     *
+     * <p>1) the number executors of the topology that needs to be scheduled is already on the rack in descending order.
+     * The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same rack as the
+     * existing executors of the topology.
+     *
+     * <p>2) the subordinate/subservient resource availability percentage of a rack in descending order We calculate
+     * the resource availability percentage by dividing the resource availability on the rack by the resource
+     * availability of the  entire cluster By doing this calculation, racks that have exhausted or little of one of
+     * the resources mentioned above will be ranked after racks that have more balanced resource availability. So we
+     * will be less likely to pick a rack that have a lot of one resource but a low amount of another.
+     *
+     * @param topoId topology id
+     * @return a sorted list of racks
+     */
+    @VisibleForTesting
+    TreeSet<ObjectResources> sortRacks(final String topoId) {
+        AllResources allResources = new AllResources("Cluster");
+        List<ObjectResources> racks = allResources.objectResources;
+
+        final Map<String, String> nodeIdToRackId = new HashMap<String, String>();
+
+        for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
+            String rackId = entry.getKey();
+            List<String> nodeIds = entry.getValue();
+            ObjectResources rack = new ObjectResources(rackId);
+            racks.add(rack);
+            for (String nodeId : nodeIds) {
+                RAS_Node node = nodes.getNodeById(nodeHostnameToId(nodeId));
+                double availMem = node.getAvailableMemoryResources();
+                rack.availMem += availMem;
+                double availCpu = node.getAvailableCpuResources();
+                rack.availCpu += availCpu;
+                double totalMem = node.getTotalMemoryResources();
+                rack.totalMem += totalMem;
+                double totalCpu = node.getTotalCpuResources();
+                rack.totalCpu += totalCpu;
+
+                nodeIdToRackId.put(nodeId, rack.id);
+
+                allResources.availMemResourcesOverall += availMem;
+                allResources.availCpuResourcesOverall += availCpu;
+
+                allResources.totalMemResourcesOverall += totalMem;
+                allResources.totalCpuResourcesOverall += totalCpu;
+            }
+        }
+        LOG.debug(
+            "Cluster Overall Avail [ CPU {} MEM {} ] Total [ CPU {} MEM {} ]",
+            allResources.availCpuResourcesOverall,
+            allResources.availMemResourcesOverall,
+            allResources.totalCpuResourcesOverall,
+            allResources.totalMemResourcesOverall);
+
+        return sortObjectResources(
+            allResources,
+            new ExistingScheduleFunc() {
+                @Override
+                public int getNumExistingSchedule(String objectId) {
+                    String rackId = objectId;
+                    //Get execs already assigned in rack
+                    Collection<ExecutorDetails> execs = new LinkedList<ExecutorDetails>();
+                    if (cluster.getAssignmentById(topoId) != null) {
+                        for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
+                            cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
+                            String nodeId = entry.getValue().getNodeId();
+                            String hostname = idToNode(nodeId).getHostname();
+                            ExecutorDetails exec = entry.getKey();
+                            if (nodeIdToRackId.get(hostname) != null
+                                && nodeIdToRackId.get(hostname).equals(rackId)) {
+                                execs.add(exec);
+                            }
+                        }
+                    }
+                    return execs.size();
+                }
+            });
+    }
+
+    /**
+     * Sort objects by the following two criteria. 1) the number executors of the topology that needs
+     * to be scheduled is already on the object (node or rack) in descending order. The reasoning to
+     * sort based on criterion 1 is so we schedule the rest of a topology on the same object (node or
+     * rack) as the existing executors of the topology. 2) the subordinate/subservient resource
+     * availability percentage of a rack in descending order We calculate the resource availability
+     * percentage by dividing the resource availability of the object (node or rack) by the resource
+     * availability of the entire rack or cluster depending on if object references a node or a rack.
+     * By doing this calculation, objects (node or rack) that have exhausted or little of one of the
+     * resources mentioned above will be ranked after racks that have more balanced resource
+     * availability. So we will be less likely to pick a rack that have a lot of one resource but a
+     * low amount of another.
+     *
+     * @param allResources contains all individual ObjectResources as well as cumulative stats
+     * @param existingScheduleFunc a function to get existing executors already scheduled on this object
+     * @return a sorted list of ObjectResources
+     */
+    private TreeSet<ObjectResources> sortObjectResources(
+        final AllResources allResources, final ExistingScheduleFunc existingScheduleFunc) {
+
+        for (ObjectResources objectResources : allResources.objectResources) {
+            StringBuilder sb = new StringBuilder();
+            if (allResources.availCpuResourcesOverall <= 0.0
+                || allResources.availMemResourcesOverall <= 0.0) {
+                objectResources.effectiveResources = 0.0;
+            } else {
+                List<Double> values = new LinkedList<>();
+
+                //add cpu
+                double cpuPercent =
+                    (objectResources.availCpu / allResources.availCpuResourcesOverall) * 100.0;
+                values.add(cpuPercent);
+                sb.append(String.format("CPU %f(%f%%) ", objectResources.availCpu, cpuPercent));
+
+                //add memory
+                double memoryPercent =
+                    (objectResources.availMem / allResources.availMemResourcesOverall) * 100.0;
+                values.add(memoryPercent);
+                sb.append(String.format("MEM %f(%f%%) ", objectResources.availMem, memoryPercent));
+
+                objectResources.effectiveResources = Collections.min(values);
+            }
+            LOG.debug(
+                "{}: Avail [ {} ] Total [ CPU {} MEM {}] effective resources: {}",
+                objectResources.id,
+                sb.toString(),
+                objectResources.totalCpu,
+                objectResources.totalMem,
+                objectResources.effectiveResources);
+        }
+
+        TreeSet<ObjectResources> sortedObjectResources =
+            new TreeSet<>(( o1, o2) -> {
+                int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+                int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+                if (execsScheduled1 > execsScheduled2) {
+                    return -1;
+                } else if (execsScheduled1 < execsScheduled2) {
+                    return 1;
+                } else {
+                    if (o1.effectiveResources > o2.effectiveResources) {
+                        return -1;
+                    } else if (o1.effectiveResources < o2.effectiveResources) {
+                        return 1;
+                    } else {
+                        List<Double> o1Values = new LinkedList<Double>();
+                        List<Double> o2Values = new LinkedList<Double>();
+                        o1Values.add((o1.availCpu / allResources.availCpuResourcesOverall) * 100.0);
+                        o2Values.add((o2.availCpu / allResources.availCpuResourcesOverall) * 100.0);
+
+                        o1Values.add((o1.availMem / allResources.availMemResourcesOverall) * 100.0);
+                        o2Values.add((o2.availMem / allResources.availMemResourcesOverall) * 100.0);
+
+                        double o1Avg = ResourceUtils.avg(o1Values);
+                        double o2Avg = ResourceUtils.avg(o2Values);
+
+                        if (o1Avg > o2Avg) {
+                            return -1;
+                        } else if (o1Avg < o2Avg) {
+                            return 1;
+                        } else {
+                            return o1.id.compareTo(o2.id);
+                        }
+                    }
+                }
+            });
+        sortedObjectResources.addAll(allResources.objectResources);
+        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+        return sortedObjectResources;
+    }
+
+    /**
+     * Get the rack on which a node is a part of.
+     *
+     * @param node the node to find out which rack its on
+     * @return the rack id
+     */
+    private String nodeToRack(RAS_Node node) {
+        for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
+            if (entry.getValue().contains(node.getHostname())) {
+                return entry.getKey();
+            }
+        }
+        LOG.error("Node: {} not found in any racks", node.getHostname());
+        return null;
+    }
+
+    /**
+     * get a list nodes from a rack.
+     *
+     * @param rackId the rack id of the rack to get nodes from
+     * @return a list of nodes
+     */
+    private List<RAS_Node> getAvailableNodesFromRack(String rackId) {
+        List<RAS_Node> retList = new ArrayList<>();
+        for (String nodeId : networkTopography.get(rackId)) {
+            retList.add(nodes.getNodeById(this.nodeHostnameToId(nodeId)));
+        }
+        return retList;
+    }
+
+    /**
+     * sort components by the number of in and out connections that need to be made, in descending order
+     *
+     * @param componentMap The components that need to be sorted
+     * @return a sorted set of components
+     */
+    private Set<Component> sortComponents(final Map<String, Component> componentMap) {
+        Set<Component> sortedComponents =
+            new TreeSet<>((o1, o2) -> {
+                int connections1 = 0;
+                int connections2 = 0;
+
+                for (String childId : union(o1.getChildren(), o1.getParents())) {
+                    connections1 +=
+                        (componentMap.get(childId).getExecs().size() * o1.getExecs().size());
+                }
+
+                for (String childId : union(o2.getChildren(), o2.getParents())) {
+                    connections2 +=
+                        (componentMap.get(childId).getExecs().size() * o2.getExecs().size());
+                }
+
+                if (connections1 > connections2) {
+                    return -1;
+                } else if (connections1 < connections2) {
+                    return 1;
+                } else {
+                    return o1.getId().compareTo(o2.getId());
+                }
+            });
+        sortedComponents.addAll(componentMap.values());
+        return sortedComponents;
+    }
+
+    private static <T> Set<T> union(Set<T> a, Set<T> b) {
+        HashSet<T> ret = new HashSet<>(a);
+        ret.addAll(b);
+        return ret;
+    }
+
+    /**
+     * Sort a component's neighbors by the number of connections it needs to make with this component.
+     *
+     * @param thisComp the component that we need to sort its neighbors
+     * @param componentMap all the components to sort
+     * @return a sorted set of components
+     */
+    private Set<Component> sortNeighbors(
+        final Component thisComp, final Map<String, Component> componentMap) {
+        Set<Component> sortedComponents =
+            new TreeSet<>((o1, o2) -> {
+                int connections1 = o1.getExecs().size() * thisComp.getExecs().size();
+                int connections2 = o2.getExecs().size() * thisComp.getExecs().size();
+                if (connections1 < connections2) {
+                    return -1;
+                } else if (connections1 > connections2) {
+                    return 1;
+                } else {
+                    return o1.getId().compareTo(o2.getId());
+                }
+            });
+        sortedComponents.addAll(componentMap.values());
+        return sortedComponents;
+    }
+
+    /**
+     * Order executors based on how many in and out connections it will potentially need to make, in descending order.
+     * First order components by the number of in and out connections it will have.  Then iterate through the sorted list of components.
+     * For each component sort the neighbors of that component by how many connections it will have to make with that component.
+     * Add an executor from this component and then from each neighboring component in sorted order.  Do this until there is nothing left to schedule
+     *
+     * @param td The topology the executors belong to
+     * @param unassignedExecutors a collection of unassigned executors that need to be unassigned. Should only try to
+     *     assign executors from this list
+     * @return a list of executors in sorted order
+     */
+    private List<ExecutorDetails> orderExecutors(
+        TopologyDetails td, Collection<ExecutorDetails> unassignedExecutors) {
+        Map<String, Component> componentMap = td.getComponents();
+        List<ExecutorDetails> execsScheduled = new LinkedList<>();
+
+        Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
+        for (Component component : componentMap.values()) {
+            compToExecsToSchedule.put(component.getId(), new LinkedList<ExecutorDetails>());
+            for (ExecutorDetails exec : component.getExecs()) {
+                if (unassignedExecutors.contains(exec)) {
+                    compToExecsToSchedule.get(component.getId()).add(exec);
+                }
+            }
+        }
+
+        Set<Component> sortedComponents = sortComponents(componentMap);
+        sortedComponents.addAll(componentMap.values());
+
+        for (Component currComp : sortedComponents) {
+            Map<String, Component> neighbors = new HashMap<String, Component>();
+            for (String compId : union(currComp.getChildren(), currComp.getParents())) {
+                neighbors.put(compId, componentMap.get(compId));
+            }
+            Set<Component> sortedNeighbors = sortNeighbors(currComp, neighbors);
+            Queue<ExecutorDetails> currCompExesToSched = compToExecsToSchedule.get(currComp.getId());
+
+            boolean flag = false;
+            do {
+                flag = false;
+                if (!currCompExesToSched.isEmpty()) {
+                    execsScheduled.add(currCompExesToSched.poll());
+                    flag = true;
+                }
+
+                for (Component neighborComp : sortedNeighbors) {
+                    Queue<ExecutorDetails> neighborCompExesToSched =
+                        compToExecsToSchedule.get(neighborComp.getId());
+                    if (!neighborCompExesToSched.isEmpty()) {
+                        execsScheduled.add(neighborCompExesToSched.poll());
+                        flag = true;
+                    }
+                }
+            } while (flag);
+        }
+        return execsScheduled;
+    }
+
+    /**
+     * Get a list of all the spouts in the topology.
+     *
+     * @param td topology to get spouts from
+     * @return a list of spouts
+     */
+    private List<Component> getSpouts(TopologyDetails td) {
+        List<Component> spouts = new ArrayList<>();
+
+        for (Component c : td.getComponents().values()) {
+            if (c.getType() == ComponentType.SPOUT) {
+                spouts.add(c);
+            }
+        }
+        return spouts;
+    }
+
+    /**
+     * Get the amount of resources available and total for each node.
+     *
+     * @return a String with cluster resource info for debug
+     */
+    private void logClusterInfo() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Cluster:");
+            for (Map.Entry<String, List<String>> clusterEntry : networkTopography.entrySet()) {
+                String rackId = clusterEntry.getKey();
+                LOG.debug("Rack: {}", rackId);
+                for (String nodeHostname : clusterEntry.getValue()) {
+                    RAS_Node node = idToNode(this.nodeHostnameToId(nodeHostname));
+                    LOG.debug("-> Node: {} {}", node.getHostname(), node.getId());
+                    LOG.debug(
+                        "--> Avail Resources: {Mem {}, CPU {} Slots: {}}",
+                        node.getAvailableMemoryResources(),
+                        node.getAvailableCpuResources(),
+                        node.totalSlotsFree());
+                    LOG.debug(
+                        "--> Total Resources: {Mem {}, CPU {} Slots: {}}",
+                        node.getTotalMemoryResources(),
+                        node.getTotalCpuResources(),
+                        node.totalSlots());
+                }
+            }
+        }
+    }
+
+    /**
+     * hostname to Id.
+     *
+     * @param hostname the hostname to convert to node id
+     * @return the id of a node
+     */
+    public String nodeHostnameToId(String hostname) {
+        for (RAS_Node n : nodes.getNodes()) {
+            if (n.getHostname() == null) {
+                continue;
+            }
+            if (n.getHostname().equals(hostname)) {
+                return n.getId();
+            }
+        }
+        LOG.error("Cannot find Node with hostname {}", hostname);
+        return null;
+    }
+
+    /**
+     * Find RAS_Node for specified node id.
+     *
+     * @param id the node/supervisor id to lookup
+     * @return a RAS_Node object
+     */
+    public RAS_Node idToNode(String id) {
+        RAS_Node ret = nodes.getNodeById(id);
+        if (ret == null) {
+            LOG.error("Cannot find Node with Id: {}", id);
+        }
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
new file mode 100644
index 0000000..53859fd
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import java.util.Map;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+
+/**
+ * An interface to for implementing different scheduling strategies for the resource aware scheduling.
+ * In the future strategies will be pluggable
+ */
+public interface IStrategy {
+    
+    /**
+     * Prepare the Strategy for scheduling.
+     * @param config the cluster configuration
+     */
+    void prepare(Map<String, Object> config);
+    
+    /**
+     * This method is invoked to calculate a scheduling for topology td.  Cluster will reject any changes that are
+     * not for the given topology.  Any changes made to the cluster will be committed if the scheduling is successful.
+     * @param schedulingState the current state of the cluster
+     * @param td the topology to schedule for
+     * @return returns a SchedulingResult object containing SchedulingStatus object to indicate whether scheduling is
+     *     successful.
+     */
+    SchedulingResult schedule(Cluster schedulingState, TopologyDetails td);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/test/java/org/apache/storm/TestCgroups.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/TestCgroups.java b/storm-server/src/test/java/org/apache/storm/TestCgroups.java
index 0a57cc5..c785dde 100644
--- a/storm-server/src/test/java/org/apache/storm/TestCgroups.java
+++ b/storm-server/src/test/java/org/apache/storm/TestCgroups.java
@@ -60,11 +60,8 @@ public class TestCgroups {
         CgroupManager manager = new CgroupManager();
         manager.prepare(config);
 
-        Map<String, Number> resourcesMap = new HashMap<>();
-        resourcesMap.put("cpu", 200);
-        resourcesMap.put("memory", 1024);
         String workerId = UUID.randomUUID().toString();
-        manager.reserveResourcesForWorker(workerId, resourcesMap);
+        manager.reserveResourcesForWorker(workerId, 1024, 200);
 
         List<String> commandList = manager.getLaunchCommand(workerId, new ArrayList<String>());
         StringBuilder command = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
new file mode 100644
index 0000000..0458ca4
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.nimbus;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class NimbusTest {
+    @Test
+    public void testMemoryLoadLargerThanMaxHeapSize() throws Exception {
+        // Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=128.0 < 129.0,
+        // Largest memory requirement of a component in the topology).
+        TopologyBuilder builder1 = new TopologyBuilder();
+        builder1.setSpout("wordSpout1", new TestWordSpout(), 4);
+        StormTopology stormTopology1 = builder1.createTopology();
+        Config config1 = new Config();
+        config1.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
+        config1.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config1.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+
+        config1.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        config1.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
+        config1.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0);
+        config1.put(Config.TOPOLOGY_PRIORITY, 0);
+        config1.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
+        config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
+        config1.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 129.0);
+        try {
+            Nimbus.validateTopologyWorkerMaxHeapSizeConfigs(config1, stormTopology1);
+            fail("Expected exception not thrown");
+        } catch (IllegalArgumentException e) {
+            //Expected...
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
index dd267e3..fe0a993 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
@@ -416,6 +416,7 @@ public class BasicContainerTest {
                     "-Dlog4j.configurationFile=" + workerConf,
                     "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
                     "-Dstorm.local.dir=" + stormLocal,
+                    "-Dworker.memory_limit_mb=768",
                     "org.apache.storm.LogWriter",
                     "java",
                     "-server",
@@ -430,6 +431,7 @@ public class BasicContainerTest {
                     "-Dlog4j.configurationFile=" + workerConf,
                     "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
                     "-Dstorm.local.dir=" + stormLocal,
+                    "-Dworker.memory_limit_mb=768",
                     "-Dtesting=true",
                     "-Djava.library.path=JLP",
                     "-Dstorm.conf.file=",

http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java
new file mode 100644
index 0000000..9c01a2a
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link Cluster}.
+ */
+public class ClusterTest {
+
+    /** This should match the value in Cluster.getAssignedMemoryForSlot. */
+    final Double TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION = 768.0;
+
+    private Map<String, Object> getConfig(String key, Object value) {
+        Map<String, Object> topConf = getEmptyConfig();
+        topConf.put(key, value);
+        return topConf;
+    }
+
+    private Map<String, Object> getEmptyConfig() {
+        Map<String, Object> topConf = new HashMap<>();
+        return topConf;
+    }
+
+    private Map<String, Object> getPopulatedConfig() {
+        Map<String, Object> topConf = new HashMap<>();
+        topConf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, "-Xmx128m");
+        topConf.put(Config.WORKER_GC_CHILDOPTS, "-Xmx256m");
+        topConf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx512m");
+        topConf.put(Config.WORKER_CHILDOPTS, "-Xmx768m");
+        topConf.put(Config.WORKER_HEAP_MEMORY_MB, 1024);
+        topConf.put(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Xmx64m");
+        return topConf;
+    }
+
+    /**
+     * Test Cluster.getAssignedMemoryForSlot with a single config value set.
+     * @param key - the config key to set
+     * @param value - the config value to set
+     * @param expectedValue - the expected result
+     */
+    private void singleValueTest(String key, String value, double expectedValue) {
+        Map<String, Object> topConf = getConfig(key, value);
+        Assert.assertEquals(expectedValue, Cluster.getAssignedMemoryForSlot(topConf).doubleValue(), 0);
+    }
+
+    @Test
+    public void getAssignedMemoryForSlot_allNull() {
+        Map<String, Object> topConf = getEmptyConfig();
+        Assert.assertEquals(TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION, Cluster.getAssignedMemoryForSlot(topConf));
+    }
+
+    @Test
+    public void getAssignedMemoryForSlot_topologyWorkerGcChildopts() {
+        singleValueTest(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, "-Xmx128m", 128.0);
+    }
+
+    @Test
+    public void getAssignedMemoryForSlot_workerGcChildopts() {
+        singleValueTest(Config.WORKER_GC_CHILDOPTS, "-Xmx256m", 256.0);
+    }
+
+    @Test
+    public void getAssignedMemoryForSlot_topologyWorkerChildopts() {
+        singleValueTest(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx512m", 512.0);
+    }
+
+    @Test
+    public void getAssignedMemoryForSlot_workerChildopts() {
+        singleValueTest(Config.WORKER_CHILDOPTS, "-Xmx768m", 768.0);
+    }
+
+    @Test
+    public void getAssignedMemoryForSlot_workerHeapMemoryMb() {
+        Map<String, Object> topConf = getConfig(Config.WORKER_HEAP_MEMORY_MB, 1024);
+        Assert.assertEquals(1024.0, Cluster.getAssignedMemoryForSlot(topConf).doubleValue(), 0);
+    }
+
+    @Test
+    public void getAssignedMemoryForSlot_topologyWorkerLwChildopts() {
+        singleValueTest(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Xmx64m", 
+                TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION + 64.0);
+    }
+
+    @Test
+    public void getAssignedMemoryForSlot_all() {
+        Map<String, Object> topConf = getPopulatedConfig();
+        Assert.assertEquals(128.0 + 64.0, Cluster.getAssignedMemoryForSlot(topConf).doubleValue(), 0);
+    }
+}