You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:36 UTC

[08/23] storm git commit: made scheduling, eviction, and priority strategies pluggable

made scheduling, eviction, and priority strategies pluggable


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4cd5efa3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4cd5efa3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4cd5efa3

Branch: refs/heads/master
Commit: 4cd5efa3c5114413cf82ced7d3041e258175a5c0
Parents: 9d3c864
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Tue Nov 24 16:36:36 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Dec 4 13:07:10 2015 -0600

----------------------------------------------------------------------
 conf/defaults.yaml                              |   3 +
 storm-core/src/jvm/backtype/storm/Config.java   |  37 +-
 .../resource/ResourceAwareScheduler.java        | 191 ++++----
 .../backtype/storm/scheduler/resource/User.java |  19 -
 .../resource/strategies/IStrategy.java          |  38 --
 .../strategies/ResourceAwareStrategy.java       | 486 ------------------
 .../eviction/DefaultEvictionStrategy.java       | 115 +++++
 .../strategies/eviction/IEvictionStrategy.java  |  47 ++
 .../DefaultSchedulingPriorityStrategy.java      |  92 ++++
 .../priority/ISchedulingPriorityStrategy.java   |  41 ++
 .../DefaultResourceAwareStrategy.java           | 487 +++++++++++++++++++
 .../strategies/scheduling/IStrategy.java        |  43 ++
 .../storm/validation/ConfigValidation.java      |  25 +
 .../validation/ConfigValidationAnnotations.java |   9 +
 .../jvm/backtype/storm/TestConfigValidate.java  |  29 ++
 15 files changed, 1009 insertions(+), 653 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 2a99ba6..cef09d3 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -256,6 +256,9 @@ topology.component.resources.onheap.memory.mb: 128.0
 topology.component.resources.offheap.memory.mb: 0.0
 topology.component.cpu.pcore.percent: 10.0
 topology.worker.max.heap.size.mb: 768.0
+topology.scheduler.strategy: "backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy"
+resource.aware.scheduler.eviction.strategy: "backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy"
+resource.aware.scheduler.priority.strategy: "backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
 
 dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index d9c8815..54af6fa 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -17,6 +17,9 @@
  */
 package backtype.storm;
 
+import backtype.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
+import backtype.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
+import backtype.storm.scheduler.resource.strategies.scheduling.IStrategy;
 import backtype.storm.serialization.IKryoDecorator;
 import backtype.storm.serialization.IKryoFactory;
 import backtype.storm.validation.ConfigValidationAnnotations.*;
@@ -194,7 +197,8 @@ public class Config extends HashMap<String, Object> {
      * rack names that correspond to the supervisors. This information is stored in Cluster.java, and
      * is used in the resource aware scheduler.
      */
-    @isString
+    @NotNull
+    @isImplementationOfClass(implementsClass = backtype.storm.networktopography.DNSToSwitchMapping.class)
     public static final String STORM_NETWORK_TOPOGRAPHY_PLUGIN = "storm.network.topography.plugin";
 
     /**
@@ -1482,6 +1486,13 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB = "topology.worker.max.heap.size.mb";
 
     /**
+     * The strategy to use when scheduling a topology with Resource Aware Scheduler
+     */
+    @NotNull
+    @isImplementationOfClass(implementsClass = IStrategy.class)
+    public static final String TOPOLOGY_SCHEDULER_STRATEGY = "topology.scheduler.strategy";
+
+    /**
      * How many executors to spawn for ackers.
      *
      * <p>By not setting this variable or setting it as null, Storm will set the number of acker executors
@@ -1943,6 +1954,20 @@ public class Config extends HashMap<String, Object> {
     public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = "resource.aware.scheduler.user.pools";
 
     /**
+     * The class that specifies the eviction strategy to use in ResourceAwareScheduler
+     */
+    @NotNull
+    @isImplementationOfClass(implementsClass = IEvictionStrategy.class)
+    public static final String RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY = "resource.aware.scheduler.eviction.strategy";
+
+    /**
+     * the class that specifies the scheduling priority strategy to use in ResourceAwareScheduler
+     */
+    @NotNull
+    @isImplementationOfClass(implementsClass = ISchedulingPriorityStrategy.class)
+    public static final String RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY = "resource.aware.scheduler.priority.strategy";
+
+    /**
      * The number of machines that should be used by this topology to isolate it from all others. Set storm.scheduler
      * to backtype.storm.scheduler.multitenant.MultitenantScheduler
      */
@@ -2201,4 +2226,14 @@ public class Config extends HashMap<String, Object> {
     public void setTopologyPriority(int priority) {
         this.put(Config.TOPOLOGY_PRIORITY, priority);
     }
+
+    /**
+     * Takes as input the scheduler class name.
+     * Currently only the Multitenant Scheduler and Resource Aware Scheduler are supported
+     */
+    public void setTopologyStrategy(Class<? extends IStrategy> clazz) {
+        if(clazz != null) {
+            this.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, clazz.getName());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index 934858e..f5c8354 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -19,7 +19,9 @@
 package backtype.storm.scheduler.resource;
 
 import backtype.storm.Config;
-import backtype.storm.scheduler.SchedulerAssignment;
+import backtype.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
+import backtype.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
+import backtype.storm.scheduler.resource.strategies.scheduling.IStrategy;
 import backtype.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,7 +32,7 @@ import backtype.storm.scheduler.IScheduler;
 import backtype.storm.scheduler.Topologies;
 import backtype.storm.scheduler.TopologyDetails;
 import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.scheduler.resource.strategies.ResourceAwareStrategy;
+import backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -98,87 +100,36 @@ public class ResourceAwareScheduler implements IScheduler {
 
         LOG.info("Nodes:\n{}", this.nodes);
 
-        LOG.info("getNextUser: {}", this.getNextUser());
+        //LOG.info("getNextUser: {}", this.getNextUser());
 
+        ISchedulingPriorityStrategy schedulingPrioritystrategy = null;
         while (true) {
             LOG.info("/*********** next scheduling iteration **************/");
 
-            User nextUser = this.getNextUser();
-            if (nextUser == null) {
+            if(schedulingPrioritystrategy == null) {
+                try {
+                    schedulingPrioritystrategy = (ISchedulingPriorityStrategy) Utils.newInstance((String) this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
+                } catch (RuntimeException e) {
+                    LOG.error("failed to create instance of priority strategy: {} with error: {}! No topology eviction will be done.",
+                            this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY), e.getMessage());
+                    break;
+                }
+            }
+            //need to re prepare since scheduling state might have been restored
+            schedulingPrioritystrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+            //Call scheduling priority strategy
+            TopologyDetails td = schedulingPrioritystrategy.getNextTopologyToSchedule();
+            if(td == null) {
                 break;
             }
-            TopologyDetails td = nextUser.getNextTopologyToSchedule();
             scheduleTopology(td);
         }
+
         //since scheduling state might have been restored thus need to set the cluster and topologies.
         cluster = this.cluster;
         topologies = this.topologies;
     }
 
-    private boolean makeSpaceForTopo(TopologyDetails td) {
-        LOG.info("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
-        User submitter = this.userMap.get(td.getTopologySubmitter());
-        if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null) {
-            return false;
-        }
-
-        double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCPUResourceGuaranteed();
-        double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
-
-        //user has enough resource under his or her resource guarantee to schedule topology
-        if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
-            User evictUser = this.findUserWithMostResourcesAboveGuarantee();
-            if (evictUser == null) {
-                LOG.info("Cannot make space for topology {} from user {}", td.getName(), submitter.getId());
-                submitter.moveTopoFromPendingToAttempted(td, this.cluster);
-
-                return false;
-            }
-            TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
-            LOG.info("topology to evict: {}", topologyEvict);
-            evictTopology(topologyEvict);
-
-            LOG.info("Resources After eviction:\n{}", this.nodes);
-
-            return true;
-        } else {
-
-            if ((1.0 - submitter.getCPUResourcePoolUtilization()) < cpuNeeded) {
-
-            }
-
-            if ((1.0 - submitter.getMemoryResourcePoolUtilization()) < memoryNeeded) {
-
-            }
-            return false;
-
-        }
-    }
-
-    private void evictTopology(TopologyDetails topologyEvict) {
-        Collection<WorkerSlot> workersToEvict = this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId());
-        User submitter = this.userMap.get(topologyEvict.getTopologySubmitter());
-
-        LOG.info("Evicting Topology {} with workers: {}", topologyEvict.getName(), workersToEvict);
-        LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
-        this.nodes.freeSlots(workersToEvict);
-        submitter.moveTopoFromRunningToPending(topologyEvict, this.cluster);
-        LOG.info("check if topology unassigned: {}", this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId()));
-    }
-
-    private User findUserWithMostResourcesAboveGuarantee() {
-        double most = 0.0;
-        User mostOverUser = null;
-        for (User user : this.userMap.values()) {
-            double over = user.getResourcePoolAverageUtilization() - 1.0;
-            if ((over > most) && (!user.getTopologiesRunning().isEmpty())) {
-                most = over;
-                mostOverUser = user;
-            }
-        }
-        return mostOverUser;
-    }
-
     public void scheduleTopology(TopologyDetails td) {
         User topologySubmitter = this.userMap.get(td.getTopologySubmitter());
         if (cluster.getUnassignedExecutors(td).size() > 0) {
@@ -190,9 +141,19 @@ public class ResourceAwareScheduler implements IScheduler {
             LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
 
             SchedulingState schedulingState = this.checkpointSchedulingState();
+            IStrategy RAStrategy = null;
+            try {
+                RAStrategy = (IStrategy) Utils.newInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
+            } catch (RuntimeException e) {
+                LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.",
+                        td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
+                topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
+                return;
+            }
+            IEvictionStrategy evictionStrategy = null;
             while (true) {
-                //Need to reinitialize ResourceAwareStrategy with cluster and topologies in case scheduling state was restored
-                ResourceAwareStrategy RAStrategy = new ResourceAwareStrategy(this.cluster, this.topologies);
+                //Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
+                RAStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
                 SchedulingResult result = RAStrategy.schedule(td);
                 LOG.info("scheduling result: {}", result);
                 if (result.isValid()) {
@@ -201,8 +162,8 @@ public class ResourceAwareScheduler implements IScheduler {
                             if (mkAssignment(td, result.getSchedulingResultMap())) {
                                 topologySubmitter.moveTopoFromPendingToRunning(td, this.cluster);
                             } else {
-                                //resetAssignments(assignmentCheckpoint);
                                 this.restoreCheckpointSchedulingState(schedulingState);
+                                //since state is restored need the update User topologySubmitter to the new User object in userMap
                                 topologySubmitter = this.userMap.get(td.getTopologySubmitter());
                                 topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
                             }
@@ -218,7 +179,19 @@ public class ResourceAwareScheduler implements IScheduler {
                         break;
                     } else {
                         if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
-                            if (!this.makeSpaceForTopo(td)) {
+                            if(evictionStrategy == null) {
+                                try {
+                                    evictionStrategy = (IEvictionStrategy) Utils.newInstance((String) this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY));
+                                } catch (RuntimeException e) {
+                                    LOG.error("failed to create instance of eviction strategy: {} with error: {}! No topology eviction will be done.",
+                                            this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY), e.getMessage());
+                                    topologySubmitter.moveTopoFromPendingToAttempted(td);
+                                    break;
+                                }
+                            }
+                            //need to re prepare since scheduling state might have been restored
+                            evictionStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+                            if (!evictionStrategy.makeSpaceForTopo(td)) {
                                 this.cluster.setStatus(td.getId(), result.getErrorMessage());
                                 this.restoreCheckpointSchedulingState(schedulingState);
                                 //since state is restored need the update User topologySubmitter to the new User object in userMap
@@ -324,41 +297,41 @@ public class ResourceAwareScheduler implements IScheduler {
         return this.userMap;
     }
 
-    public User getNextUser() {
-        Double least = Double.POSITIVE_INFINITY;
-        User ret = null;
-        for (User user : this.userMap.values()) {
-            LOG.info("getNextUser {}", user.getDetailedInfo());
-            LOG.info("hasTopologyNeedSchedule: {}", user.hasTopologyNeedSchedule());
-            if (user.hasTopologyNeedSchedule()) {
-                Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
-                if(ret!=null) {
-                    LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), least, user.getId(), userResourcePoolAverageUtilization);
-                    LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, least == userResourcePoolAverageUtilization);
-                    LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001));
-
-                }
-                if (least > userResourcePoolAverageUtilization) {
-                    ret = user;
-                    least = userResourcePoolAverageUtilization;
-                } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
-                    double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
-                    double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
-                    double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
-
-                    double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
-                    double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
-                    double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
-                    LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), currentAvgPercentage, user.getId(), userAvgPercentage);
-                    if (userAvgPercentage > currentAvgPercentage) {
-                        ret = user;
-                        least = userResourcePoolAverageUtilization;
-                    }
-                }
-            }
-        }
-        return ret;
-    }
+//    public User getNextUser() {
+//        Double least = Double.POSITIVE_INFINITY;
+//        User ret = null;
+//        for (User user : this.userMap.values()) {
+//            LOG.info("getNextUser {}", user.getDetailedInfo());
+//            LOG.info("hasTopologyNeedSchedule: {}", user.hasTopologyNeedSchedule());
+//            if (user.hasTopologyNeedSchedule()) {
+//                Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
+//                if(ret!=null) {
+//                    LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), least, user.getId(), userResourcePoolAverageUtilization);
+//                    LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, least == userResourcePoolAverageUtilization);
+//                    LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001));
+//
+//                }
+//                if (least > userResourcePoolAverageUtilization) {
+//                    ret = user;
+//                    least = userResourcePoolAverageUtilization;
+//                } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
+//                    double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
+//                    double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
+//                    double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
+//
+//                    double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
+//                    double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
+//                    double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
+//                    LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), currentAvgPercentage, user.getId(), userAvgPercentage);
+//                    if (userAvgPercentage > currentAvgPercentage) {
+//                        ret = user;
+//                        least = userResourcePoolAverageUtilization;
+//                    }
+//                }
+//            }
+//        }
+//        return ret;
+//    }
 
     /**
      * Intialize scheduling and running queues

http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
index 77a1dff..2d7c79f 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
@@ -314,25 +314,6 @@ public class User {
         return ret;
     }
 
-    public static int cTo(TopologyDetails topo1, TopologyDetails topo2) {
-        if (topo1.getId().compareTo(topo2.getId()) == 0) {
-            return 0;
-        }
-        if (topo1.getTopologyPriority() > topo2.getTopologyPriority()) {
-            return 1;
-        } else if (topo1.getTopologyPriority() < topo2.getTopologyPriority()) {
-            return -1;
-        } else {
-            if (topo1.getUpTime() > topo2.getUpTime()) {
-                return -1;
-            } else if (topo1.getUpTime() < topo2.getUpTime()) {
-                return 1;
-            } else {
-                return topo1.getId().compareTo(topo2.getId());
-            }
-        }
-    }
-
     /**
      * Comparator that sorts topologies by priority and then by submission time
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java
deleted file mode 100644
index 722eddb..0000000
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.scheduler.resource.strategies;
-
-import java.util.Collection;
-import java.util.Map;
-
-import backtype.storm.scheduler.Topologies;
-import backtype.storm.scheduler.ExecutorDetails;
-import backtype.storm.scheduler.TopologyDetails;
-import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.scheduler.resource.RAS_Node;
-import backtype.storm.scheduler.resource.SchedulingResult;
-
-/**
- * An interface to for implementing different scheduling strategies for the resource aware scheduling
- * In the future stategies will be pluggable
- */
-public interface IStrategy {
-
-    public SchedulingResult schedule(TopologyDetails td);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
deleted file mode 100644
index 812bf5d..0000000
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.scheduler.resource.strategies;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.TreeMap;
-import java.util.HashSet;
-import java.util.Iterator;
-
-import backtype.storm.scheduler.resource.RAS_Nodes;
-import backtype.storm.scheduler.resource.SchedulingResult;
-import backtype.storm.scheduler.resource.SchedulingStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.scheduler.Cluster;
-import backtype.storm.scheduler.ExecutorDetails;
-import backtype.storm.scheduler.Topologies;
-import backtype.storm.scheduler.TopologyDetails;
-import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.scheduler.resource.Component;
-import backtype.storm.scheduler.resource.RAS_Node;
-
-public class ResourceAwareStrategy implements IStrategy {
-    private static final Logger LOG = LoggerFactory.getLogger(ResourceAwareStrategy.class);
-    private Topologies _topologies;
-    private Cluster _cluster;
-    //Map key is the supervisor id and the value is the corresponding RAS_Node Object 
-    private Map<String, RAS_Node> _availNodes;
-    private RAS_Node refNode = null;
-    /**
-     * supervisor id -> Node
-     */
-    private Map<String, RAS_Node> _nodes;
-    private Map<String, List<String>> _clusterInfo;
-
-    private final double CPU_WEIGHT = 1.0;
-    private final double MEM_WEIGHT = 1.0;
-    private final double NETWORK_WEIGHT = 1.0;
-
-    public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
-        _topologies = topologies;
-        _cluster = cluster;
-        _nodes = RAS_Nodes.getAllNodesFrom(cluster, _topologies);
-        _availNodes = this.getAvailNodes();
-        _clusterInfo = cluster.getNetworkTopography();
-        LOG.debug(this.getClusterInfo());
-    }
-
-    //the returned TreeMap keeps the Components sorted
-    private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap(
-            Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) {
-        TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<>();
-        Integer rank = 0;
-        for (Component ras_comp : ordered__Component_list) {
-            retMap.put(rank, new ArrayList<ExecutorDetails>());
-            for(ExecutorDetails exec : ras_comp.execs) {
-                if(unassignedExecutors.contains(exec)) {
-                    retMap.get(rank).add(exec);
-                }
-            }
-            rank++;
-        }
-        return retMap;
-    }
-
-    public SchedulingResult schedule(TopologyDetails td) {
-        if (_availNodes.size() <= 0) {
-            LOG.warn("No available nodes to schedule tasks on!");
-            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
-        }
-        Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
-        Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>();
-        LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
-        Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
-        List<Component> spouts = this.getSpouts(_topologies, td);
-
-        if (spouts.size() == 0) {
-            LOG.error("Cannot find a Spout!");
-            return SchedulingResult.failure(SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
-        }
-
-        Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
-
-        Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
-        Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
-        Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
-        //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth. 
-        //Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
-        for (int i = 0; i < longestPriorityListSize; i++) {
-            for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) {
-                Iterator<ExecutorDetails> it = entry.getValue().iterator();
-                if (it.hasNext()) {
-                    ExecutorDetails exec = it.next();
-                    LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
-                            new Object[] { exec, td.getExecutorToComponent().get(exec),
-                    td.getTaskResourceReqList(exec), entry.getKey() });
-                    WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
-                    if (targetSlot != null) {
-                        RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
-                        if(!schedulerAssignmentMap.containsKey(targetSlot)) {
-                            schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
-                        }
-                       
-                        schedulerAssignmentMap.get(targetSlot).add(exec);
-                        targetNode.consumeResourcesforTask(exec, td);
-                        scheduledTasks.add(exec);
-                        LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
-                                targetNode, targetNode.getAvailableMemoryResources(),
-                                targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
-                                targetNode.getTotalCpuResources(), targetSlot);
-                    } else {
-                        LOG.error("Not Enough Resources to schedule Task {}", exec);
-                    }
-                    it.remove();
-                }
-            }
-        }
-
-        executorsNotScheduled.removeAll(scheduledTasks);
-        LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
-        // schedule left over system tasks
-        for (ExecutorDetails exec : executorsNotScheduled) {
-            WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
-            if (targetSlot != null) {
-                RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
-                if(!schedulerAssignmentMap.containsKey(targetSlot)) {
-                    schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
-                }
-               
-                schedulerAssignmentMap.get(targetSlot).add(exec);
-                targetNode.consumeResourcesforTask(exec, td);
-                scheduledTasks.add(exec);
-                LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
-                        targetNode, targetNode.getAvailableMemoryResources(),
-                        targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
-                        targetNode.getTotalCpuResources(), targetSlot);
-            } else {
-                LOG.error("Not Enough Resources to schedule Task {}", exec);
-            }
-        }
-
-        SchedulingResult result;
-        executorsNotScheduled.removeAll(scheduledTasks);
-        if (executorsNotScheduled.size() > 0) {
-            LOG.error("Not all executors successfully scheduled: {}",
-                    executorsNotScheduled);
-            schedulerAssignmentMap = null;
-            result = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "Not all executors successfully scheduled: " + executorsNotScheduled);
-        } else {
-            LOG.debug("All resources successfully scheduled!");
-            result = SchedulingResult.success(schedulerAssignmentMap);
-        }
-        if (schedulerAssignmentMap == null) {
-            LOG.error("Topology {} not successfully scheduled!", td.getId());
-        }
-        return result;
-    }
-
-    private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
-      WorkerSlot ws;
-      // first scheduling
-      if (this.refNode == null) {
-          String clus = this.getBestClustering();
-          ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
-      } else {
-          ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
-      }
-      if(ws != null) {
-          this.refNode = this.idToNode(ws.getNodeId());
-      }
-      LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
-      return ws;
-    }
-
-    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
-        return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
-    }
-
-    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
-        double taskMem = td.getTotalMemReqTask(exec);
-        double taskCPU = td.getTotalCpuReqTask(exec);
-        List<RAS_Node> nodes;
-        if(clusterId != null) {
-            nodes = this.getAvailableNodesFromCluster(clusterId);
-            
-        } else {
-            nodes = this.getAvailableNodes();
-        }
-        //First sort nodes by distance
-        TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<>();
-        for (RAS_Node n : nodes) {
-            if(n.getFreeSlots().size()>0) {
-                if (n.getAvailableMemoryResources() >= taskMem
-                        && n.getAvailableCpuResources() >= taskCPU) {
-                    double a = Math.pow(((taskCPU - n.getAvailableCpuResources())/(n.getAvailableCpuResources() + 1))
-                            * this.CPU_WEIGHT, 2);
-                    double b = Math.pow(((taskMem - n.getAvailableMemoryResources())/(n.getAvailableMemoryResources() + 1))
-                            * this.MEM_WEIGHT, 2);
-                    double c = 0.0;
-                    if(this.refNode != null) {
-                        c = Math.pow(this.distToNode(this.refNode, n)
-                                * this.NETWORK_WEIGHT, 2);
-                    }
-                    double distance = Math.sqrt(a + b + c);
-                    nodeRankMap.put(distance, n);
-                }
-            }
-        }
-        //Then, pick worker from closest node that satisfy constraints
-        for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) {
-            RAS_Node n = entry.getValue();
-            for(WorkerSlot ws : n.getFreeSlots()) {
-                if(checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) {
-                    return ws;
-                }
-            }
-        }
-        return null;
-    }
-
-    private String getBestClustering() {
-        String bestCluster = null;
-        Double mostRes = 0.0;
-        for (Entry<String, List<String>> cluster : _clusterInfo
-                .entrySet()) {
-            Double clusterTotalRes = this.getTotalClusterRes(cluster.getValue());
-            if (clusterTotalRes > mostRes) {
-                mostRes = clusterTotalRes;
-                bestCluster = cluster.getKey();
-            }
-        }
-        return bestCluster;
-    }
-
-    private Double getTotalClusterRes(List<String> cluster) {
-        Double res = 0.0;
-        for (String node : cluster) {
-            res += _availNodes.get(this.NodeHostnameToId(node))
-                    .getAvailableMemoryResources()
-                    + _availNodes.get(this.NodeHostnameToId(node))
-                    .getAvailableCpuResources();
-        }
-        return res;
-    }
-
-    private Double distToNode(RAS_Node src, RAS_Node dest) {
-        if (src.getId().equals(dest.getId())) {
-            return 0.0;
-        } else if (this.NodeToCluster(src).equals(this.NodeToCluster(dest))) {
-            return 0.5;
-        } else {
-            return 1.0;
-        }
-    }
-
-    private String NodeToCluster(RAS_Node node) {
-        for (Entry<String, List<String>> entry : _clusterInfo
-                .entrySet()) {
-            if (entry.getValue().contains(node.getHostname())) {
-                return entry.getKey();
-            }
-        }
-        LOG.error("Node: {} not found in any clusters", node.getHostname());
-        return null;
-    }
-    
-    private List<RAS_Node> getAvailableNodes() {
-        LinkedList<RAS_Node> nodes = new LinkedList<>();
-        for (String clusterId : _clusterInfo.keySet()) {
-            nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
-        }
-        return nodes;
-    }
-
-    private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
-        List<RAS_Node> retList = new ArrayList<>();
-        for (String node_id : _clusterInfo.get(clus)) {
-            retList.add(_availNodes.get(this
-                    .NodeHostnameToId(node_id)));
-        }
-        return retList;
-    }
-
-    private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
-        List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
-        List<WorkerSlot> workers = new LinkedList<>();
-        for(RAS_Node node : nodes) {
-            workers.addAll(node.getFreeSlots());
-        }
-        return workers;
-    }
-
-    private List<WorkerSlot> getAvailableWorker() {
-        List<WorkerSlot> workers = new LinkedList<>();
-        for (String clusterId : _clusterInfo.keySet()) {
-            workers.addAll(this.getAvailableWorkersFromCluster(clusterId));
-        }
-        return workers;
-    }
-
-    /**
-     * In case in the future RAS can only use a subset of nodes
-     */
-    private Map<String, RAS_Node> getAvailNodes() {
-        return _nodes;
-    }
-
-    /**
-     * Breadth first traversal of the topology DAG
-     * @param topologies
-     * @param td
-     * @param spouts
-     * @return A partial ordering of components
-     */
-    private Queue<Component> bfs(Topologies topologies, TopologyDetails td, List<Component> spouts) {
-        // Since queue is a interface
-        Queue<Component> ordered__Component_list = new LinkedList<Component>();
-        HashMap<String, Component> visited = new HashMap<>();
-
-        /* start from each spout that is not visited, each does a breadth-first traverse */
-        for (Component spout : spouts) {
-            if (!visited.containsKey(spout.id)) {
-                Queue<Component> queue = new LinkedList<>();
-                queue.offer(spout);
-                while (!queue.isEmpty()) {
-                    Component comp = queue.poll();
-                    visited.put(comp.id, comp);
-                    ordered__Component_list.add(comp);
-                    List<String> neighbors = new ArrayList<>();
-                    neighbors.addAll(comp.children);
-                    neighbors.addAll(comp.parents);
-                    for (String nbID : neighbors) {
-                        if (!visited.containsKey(nbID)) {
-                            Component child = topologies.getAllComponents().get(td.getId()).get(nbID);
-                            queue.offer(child);
-                        }
-                    }
-                }
-            }
-        }
-        return ordered__Component_list;
-    }
-
-    private List<Component> getSpouts(Topologies topologies, TopologyDetails td) {
-        List<Component> spouts = new ArrayList<>();
-        for (Component c : topologies.getAllComponents().get(td.getId())
-                .values()) {
-            if (c.type == Component.ComponentType.SPOUT) {
-                spouts.add(c);
-            }
-        }
-        return spouts;
-    }
-
-    private Integer getLongestPriorityListSize(Map<Integer, List<ExecutorDetails>> priorityToExecutorMap) {
-        Integer mostNum = 0;
-        for (List<ExecutorDetails> execs : priorityToExecutorMap.values()) {
-            Integer numExecs = execs.size();
-            if (mostNum < numExecs) {
-                mostNum = numExecs;
-            }
-        }
-        return mostNum;
-    }
-
-    /**
-     * Get the remaining amount memory that can be assigned to a worker given the set worker max heap size
-     * @param ws
-     * @param td
-     * @param scheduleAssignmentMap
-     * @return The remaining amount of memory
-     */
-    private Double getWorkerScheduledMemoryAvailable(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
-        Double memScheduleUsed = this.getWorkerScheduledMemoryUse(ws, td, scheduleAssignmentMap);
-        return td.getTopologyWorkerMaxHeapSize() - memScheduleUsed;
-    }
-
-    /**
-     * Get the amount of memory already assigned to a worker
-     * @param ws
-     * @param td
-     * @param scheduleAssignmentMap
-     * @return the amount of memory
-     */
-    private Double getWorkerScheduledMemoryUse(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
-        Double totalMem = 0.0;
-        Collection<ExecutorDetails> execs = scheduleAssignmentMap.get(ws);
-        if(execs != null) {
-            for(ExecutorDetails exec : execs) {
-                totalMem += td.getTotalMemReqTask(exec);
-            }
-        } 
-        return totalMem;
-    }
-
-    /**
-     * Checks whether we can schedule an Executor exec on the worker slot ws
-     * Only considers memory currently.  May include CPU in the future
-     * @param exec
-     * @param ws
-     * @param td
-     * @param scheduleAssignmentMap
-     * @return a boolean: True denoting the exec can be scheduled on ws and false if it cannot
-     */
-    private boolean checkWorkerConstraints(ExecutorDetails exec, WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
-        boolean retVal = false;
-        if(this.getWorkerScheduledMemoryAvailable(ws, td, scheduleAssignmentMap) >= td.getTotalMemReqTask(exec)) {
-            retVal = true;
-        }
-        return retVal;
-    }
-
-    /**
-     * Get the amount of resources available and total for each node
-     * @return a String with cluster resource info for debug
-     */
-    private String getClusterInfo() {
-        String retVal = "Cluster info:\n";
-        for(Entry<String, List<String>> clusterEntry : _clusterInfo.entrySet()) {
-            String clusterId = clusterEntry.getKey();
-            retVal += "Rack: " + clusterId + "\n";
-            for(String nodeHostname : clusterEntry.getValue()) {
-                RAS_Node node = this.idToNode(this.NodeHostnameToId(nodeHostname));
-                retVal += "-> Node: " + node.getHostname() + " " + node.getId() + "\n";
-                retVal += "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + "}\n";
-                retVal += "--> Total Resources: {Mem " + node.getTotalMemoryResources() + ", CPU " + node.getTotalCpuResources() + "}\n";
-            }
-        }
-        return retVal;
-    }
-
-    /**
-     * hostname to Id
-     * @param hostname
-     * @return the id of a node
-     */
-    public String NodeHostnameToId(String hostname) {
-        for (RAS_Node n : _nodes.values()) {
-            if (n.getHostname() == null) {
-                continue;
-            }
-            if (n.getHostname().equals(hostname)) {
-                return n.getId();
-            }
-        }
-        LOG.error("Cannot find Node with hostname {}", hostname);
-        return null;
-    }
-
-    /**
-     * Find RAS_Node for specified node id
-     * @param id
-     * @return a RAS_Node object
-     */
-    public RAS_Node idToNode(String id) {
-        if(_nodes.containsKey(id) == false) {
-            LOG.error("Cannot find Node with Id: {}", id);
-            return null;
-        }
-        return _nodes.get(id);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
new file mode 100644
index 0000000..7ca7ac3
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies.eviction;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.ResourceUtils;
+import backtype.storm.scheduler.resource.User;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class DefaultEvictionStrategy implements IEvictionStrategy {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(DefaultEvictionStrategy.class);
+
+    private Topologies topologies;
+    private Cluster cluster;
+    private Map<String, User> userMap;
+    private RAS_Nodes nodes;
+
+    @Override
+    public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
+        this.topologies = topologies;
+        this.cluster = cluster;
+        this.userMap = userMap;
+        this.nodes = nodes;
+    }
+
+    @Override
+    public boolean makeSpaceForTopo(TopologyDetails td) {
+        LOG.info("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
+        User submitter = this.userMap.get(td.getTopologySubmitter());
+        if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null) {
+            return false;
+        }
+
+        double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCPUResourceGuaranteed();
+        double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
+
+        //user has enough resource under his or her resource guarantee to schedule topology
+        if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
+            User evictUser = this.findUserWithMostResourcesAboveGuarantee();
+            if (evictUser == null) {
+                LOG.info("Cannot make space for topology {} from user {}", td.getName(), submitter.getId());
+                submitter.moveTopoFromPendingToAttempted(td, this.cluster);
+
+                return false;
+            }
+            TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
+            LOG.info("topology to evict: {}", topologyEvict);
+            evictTopology(topologyEvict);
+
+            LOG.info("Resources After eviction:\n{}", this.nodes);
+
+            return true;
+        } else {
+
+            if ((1.0 - submitter.getCPUResourcePoolUtilization()) < cpuNeeded) {
+
+            }
+
+            if ((1.0 - submitter.getMemoryResourcePoolUtilization()) < memoryNeeded) {
+
+            }
+            return false;
+
+        }
+    }
+
+    private void evictTopology(TopologyDetails topologyEvict) {
+        Collection<WorkerSlot> workersToEvict = this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId());
+        User submitter = this.userMap.get(topologyEvict.getTopologySubmitter());
+
+        LOG.info("Evicting Topology {} with workers: {}", topologyEvict.getName(), workersToEvict);
+        LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
+        this.nodes.freeSlots(workersToEvict);
+        submitter.moveTopoFromRunningToPending(topologyEvict, this.cluster);
+        LOG.info("check if topology unassigned: {}", this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId()));
+    }
+
+    private User findUserWithMostResourcesAboveGuarantee() {
+        double most = 0.0;
+        User mostOverUser = null;
+        for (User user : this.userMap.values()) {
+            double over = user.getResourcePoolAverageUtilization() - 1.0;
+            if ((over > most) && (!user.getTopologiesRunning().isEmpty())) {
+                most = over;
+                mostOverUser = user;
+            }
+        }
+        return mostOverUser;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
new file mode 100644
index 0000000..b787434
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies.eviction;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.User;
+
+import java.util.Map;
+
+public interface IEvictionStrategy {
+
+    /**
+     * Initialization
+     */
+    public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
+
+    /**
+     * This method when invoked should attempt to make space on the cluster so that the topology specified can be scheduled
+     * @param td the topology to make space for
+     * @return return true to indicate that space has been made for topology and try schedule topology td again.
+     * Return false to inidcate that no space could be made for the topology on the cluster and the scheduler should give up
+     * trying to schedule the topology for this round of scheduling.  This method will be invoked until the topology indicated
+     * could be scheduled or the method returns false
+     */
+    public boolean makeSpaceForTopo(TopologyDetails td);
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
new file mode 100644
index 0000000..5096fd6
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies.priority;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.User;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class DefaultSchedulingPriorityStrategy implements  ISchedulingPriorityStrategy{
+    private static final Logger LOG = LoggerFactory
+            .getLogger(DefaultSchedulingPriorityStrategy.class);
+
+    private Topologies topologies;
+    private Cluster cluster;
+    private Map<String, User> userMap;
+    private RAS_Nodes nodes;
+
+    @Override
+    public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
+        this.topologies = topologies;
+        this.cluster = cluster;
+        this.userMap = userMap;
+        this.nodes = nodes;
+    }
+
+    @Override
+    public TopologyDetails getNextTopologyToSchedule() {
+        User nextUser = this.getNextUser();
+        if (nextUser == null) {
+            return null;
+        }
+        return nextUser.getNextTopologyToSchedule();
+    }
+
+    public User getNextUser() {
+        Double least = Double.POSITIVE_INFINITY;
+        User ret = null;
+        for (User user : this.userMap.values()) {
+            LOG.info("getNextUser {}", user.getDetailedInfo());
+            LOG.info("hasTopologyNeedSchedule: {}", user.hasTopologyNeedSchedule());
+            if (user.hasTopologyNeedSchedule()) {
+                Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
+                if(ret!=null) {
+                    LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), least, user.getId(), userResourcePoolAverageUtilization);
+                    LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, least == userResourcePoolAverageUtilization);
+                    LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001));
+
+                }
+                if (least > userResourcePoolAverageUtilization) {
+                    ret = user;
+                    least = userResourcePoolAverageUtilization;
+                } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
+                    double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
+                    double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
+                    double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
+
+                    double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
+                    double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
+                    double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
+                    LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), currentAvgPercentage, user.getId(), userAvgPercentage);
+                    if (userAvgPercentage > currentAvgPercentage) {
+                        ret = user;
+                        least = userResourcePoolAverageUtilization;
+                    }
+                }
+            }
+        }
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
new file mode 100644
index 0000000..7e92b3d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies.priority;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.User;
+
+import java.util.Map;
+
+public interface ISchedulingPriorityStrategy {
+
+    /**
+     * initializes
+     */
+    public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
+
+    /**
+     *
+     * @return return the next topology to schedule.  If there is no topologies left to schedule, return null
+     */
+    public TopologyDetails getNextTopologyToSchedule();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
new file mode 100644
index 0000000..1950858
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies.scheduling;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.SchedulingResult;
+import backtype.storm.scheduler.resource.SchedulingStatus;
+import backtype.storm.scheduler.resource.User;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.Component;
+import backtype.storm.scheduler.resource.RAS_Node;
+
+public class DefaultResourceAwareStrategy implements IStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
+    private Topologies _topologies;
+    private Cluster _cluster;
+    //Map key is the supervisor id and the value is the corresponding RAS_Node Object 
+    private Map<String, RAS_Node> _availNodes;
+    private RAS_Node refNode = null;
+    /**
+     * supervisor id -> Node
+     */
+    private Map<String, RAS_Node> _nodes;
+    private Map<String, List<String>> _clusterInfo;
+
+    private final double CPU_WEIGHT = 1.0;
+    private final double MEM_WEIGHT = 1.0;
+    private final double NETWORK_WEIGHT = 1.0;
+
+    public void prepare (Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
+        _topologies = topologies;
+        _cluster = cluster;
+        _nodes = RAS_Nodes.getAllNodesFrom(cluster, _topologies);
+        _availNodes = this.getAvailNodes();
+        _clusterInfo = cluster.getNetworkTopography();
+        LOG.debug(this.getClusterInfo());
+    }
+
+    //the returned TreeMap keeps the Components sorted
+    private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap(
+            Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) {
+        TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<>();
+        Integer rank = 0;
+        for (Component ras_comp : ordered__Component_list) {
+            retMap.put(rank, new ArrayList<ExecutorDetails>());
+            for(ExecutorDetails exec : ras_comp.execs) {
+                if(unassignedExecutors.contains(exec)) {
+                    retMap.get(rank).add(exec);
+                }
+            }
+            rank++;
+        }
+        return retMap;
+    }
+
+    public SchedulingResult schedule(TopologyDetails td) {
+        if (_availNodes.size() <= 0) {
+            LOG.warn("No available nodes to schedule tasks on!");
+            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
+        }
+        Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
+        Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>();
+        LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
+        Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
+        List<Component> spouts = this.getSpouts(_topologies, td);
+
+        if (spouts.size() == 0) {
+            LOG.error("Cannot find a Spout!");
+            return SchedulingResult.failure(SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
+        }
+
+        Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
+
+        Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
+        Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
+        Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
+        //Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth. 
+        //Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
+        for (int i = 0; i < longestPriorityListSize; i++) {
+            for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) {
+                Iterator<ExecutorDetails> it = entry.getValue().iterator();
+                if (it.hasNext()) {
+                    ExecutorDetails exec = it.next();
+                    LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
+                            new Object[] { exec, td.getExecutorToComponent().get(exec),
+                    td.getTaskResourceReqList(exec), entry.getKey() });
+                    WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
+                    if (targetSlot != null) {
+                        RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
+                        if(!schedulerAssignmentMap.containsKey(targetSlot)) {
+                            schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
+                        }
+                       
+                        schedulerAssignmentMap.get(targetSlot).add(exec);
+                        targetNode.consumeResourcesforTask(exec, td);
+                        scheduledTasks.add(exec);
+                        LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
+                                targetNode, targetNode.getAvailableMemoryResources(),
+                                targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
+                                targetNode.getTotalCpuResources(), targetSlot);
+                    } else {
+                        LOG.error("Not Enough Resources to schedule Task {}", exec);
+                    }
+                    it.remove();
+                }
+            }
+        }
+
+        executorsNotScheduled.removeAll(scheduledTasks);
+        LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
+        // schedule left over system tasks
+        for (ExecutorDetails exec : executorsNotScheduled) {
+            WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
+            if (targetSlot != null) {
+                RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
+                if(!schedulerAssignmentMap.containsKey(targetSlot)) {
+                    schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
+                }
+               
+                schedulerAssignmentMap.get(targetSlot).add(exec);
+                targetNode.consumeResourcesforTask(exec, td);
+                scheduledTasks.add(exec);
+                LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
+                        targetNode, targetNode.getAvailableMemoryResources(),
+                        targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
+                        targetNode.getTotalCpuResources(), targetSlot);
+            } else {
+                LOG.error("Not Enough Resources to schedule Task {}", exec);
+            }
+        }
+
+        SchedulingResult result;
+        executorsNotScheduled.removeAll(scheduledTasks);
+        if (executorsNotScheduled.size() > 0) {
+            LOG.error("Not all executors successfully scheduled: {}",
+                    executorsNotScheduled);
+            schedulerAssignmentMap = null;
+            result = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "Not all executors successfully scheduled: " + executorsNotScheduled);
+        } else {
+            LOG.debug("All resources successfully scheduled!");
+            result = SchedulingResult.success(schedulerAssignmentMap);
+        }
+        if (schedulerAssignmentMap == null) {
+            LOG.error("Topology {} not successfully scheduled!", td.getId());
+        }
+        return result;
+    }
+
+    private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+      WorkerSlot ws;
+      // first scheduling
+      if (this.refNode == null) {
+          String clus = this.getBestClustering();
+          ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
+      } else {
+          ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
+      }
+      if(ws != null) {
+          this.refNode = this.idToNode(ws.getNodeId());
+      }
+      LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
+      return ws;
+    }
+
+    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+        return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
+    }
+
+    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+        double taskMem = td.getTotalMemReqTask(exec);
+        double taskCPU = td.getTotalCpuReqTask(exec);
+        List<RAS_Node> nodes;
+        if(clusterId != null) {
+            nodes = this.getAvailableNodesFromCluster(clusterId);
+            
+        } else {
+            nodes = this.getAvailableNodes();
+        }
+        //First sort nodes by distance
+        TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<>();
+        for (RAS_Node n : nodes) {
+            if(n.getFreeSlots().size()>0) {
+                if (n.getAvailableMemoryResources() >= taskMem
+                        && n.getAvailableCpuResources() >= taskCPU) {
+                    double a = Math.pow(((taskCPU - n.getAvailableCpuResources())/(n.getAvailableCpuResources() + 1))
+                            * this.CPU_WEIGHT, 2);
+                    double b = Math.pow(((taskMem - n.getAvailableMemoryResources())/(n.getAvailableMemoryResources() + 1))
+                            * this.MEM_WEIGHT, 2);
+                    double c = 0.0;
+                    if(this.refNode != null) {
+                        c = Math.pow(this.distToNode(this.refNode, n)
+                                * this.NETWORK_WEIGHT, 2);
+                    }
+                    double distance = Math.sqrt(a + b + c);
+                    nodeRankMap.put(distance, n);
+                }
+            }
+        }
+        //Then, pick worker from closest node that satisfy constraints
+        for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) {
+            RAS_Node n = entry.getValue();
+            for(WorkerSlot ws : n.getFreeSlots()) {
+                if(checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) {
+                    return ws;
+                }
+            }
+        }
+        return null;
+    }
+
+    private String getBestClustering() {
+        String bestCluster = null;
+        Double mostRes = 0.0;
+        for (Entry<String, List<String>> cluster : _clusterInfo
+                .entrySet()) {
+            Double clusterTotalRes = this.getTotalClusterRes(cluster.getValue());
+            if (clusterTotalRes > mostRes) {
+                mostRes = clusterTotalRes;
+                bestCluster = cluster.getKey();
+            }
+        }
+        return bestCluster;
+    }
+
+    private Double getTotalClusterRes(List<String> cluster) {
+        Double res = 0.0;
+        for (String node : cluster) {
+            res += _availNodes.get(this.NodeHostnameToId(node))
+                    .getAvailableMemoryResources()
+                    + _availNodes.get(this.NodeHostnameToId(node))
+                    .getAvailableCpuResources();
+        }
+        return res;
+    }
+
+    private Double distToNode(RAS_Node src, RAS_Node dest) {
+        if (src.getId().equals(dest.getId())) {
+            return 0.0;
+        } else if (this.NodeToCluster(src).equals(this.NodeToCluster(dest))) {
+            return 0.5;
+        } else {
+            return 1.0;
+        }
+    }
+
+    private String NodeToCluster(RAS_Node node) {
+        for (Entry<String, List<String>> entry : _clusterInfo
+                .entrySet()) {
+            if (entry.getValue().contains(node.getHostname())) {
+                return entry.getKey();
+            }
+        }
+        LOG.error("Node: {} not found in any clusters", node.getHostname());
+        return null;
+    }
+    
+    private List<RAS_Node> getAvailableNodes() {
+        LinkedList<RAS_Node> nodes = new LinkedList<>();
+        for (String clusterId : _clusterInfo.keySet()) {
+            nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
+        }
+        return nodes;
+    }
+
+    private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
+        List<RAS_Node> retList = new ArrayList<>();
+        for (String node_id : _clusterInfo.get(clus)) {
+            retList.add(_availNodes.get(this
+                    .NodeHostnameToId(node_id)));
+        }
+        return retList;
+    }
+
+    private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
+        List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
+        List<WorkerSlot> workers = new LinkedList<>();
+        for(RAS_Node node : nodes) {
+            workers.addAll(node.getFreeSlots());
+        }
+        return workers;
+    }
+
+    private List<WorkerSlot> getAvailableWorker() {
+        List<WorkerSlot> workers = new LinkedList<>();
+        for (String clusterId : _clusterInfo.keySet()) {
+            workers.addAll(this.getAvailableWorkersFromCluster(clusterId));
+        }
+        return workers;
+    }
+
+    /**
+     * In case in the future RAS can only use a subset of nodes
+     */
+    private Map<String, RAS_Node> getAvailNodes() {
+        return _nodes;
+    }
+
+    /**
+     * Breadth first traversal of the topology DAG
+     * @param topologies
+     * @param td
+     * @param spouts
+     * @return A partial ordering of components
+     */
+    private Queue<Component> bfs(Topologies topologies, TopologyDetails td, List<Component> spouts) {
+        // Since queue is a interface
+        Queue<Component> ordered__Component_list = new LinkedList<Component>();
+        HashMap<String, Component> visited = new HashMap<>();
+
+        /* start from each spout that is not visited, each does a breadth-first traverse */
+        for (Component spout : spouts) {
+            if (!visited.containsKey(spout.id)) {
+                Queue<Component> queue = new LinkedList<>();
+                queue.offer(spout);
+                while (!queue.isEmpty()) {
+                    Component comp = queue.poll();
+                    visited.put(comp.id, comp);
+                    ordered__Component_list.add(comp);
+                    List<String> neighbors = new ArrayList<>();
+                    neighbors.addAll(comp.children);
+                    neighbors.addAll(comp.parents);
+                    for (String nbID : neighbors) {
+                        if (!visited.containsKey(nbID)) {
+                            Component child = topologies.getAllComponents().get(td.getId()).get(nbID);
+                            queue.offer(child);
+                        }
+                    }
+                }
+            }
+        }
+        return ordered__Component_list;
+    }
+
+    private List<Component> getSpouts(Topologies topologies, TopologyDetails td) {
+        List<Component> spouts = new ArrayList<>();
+        for (Component c : topologies.getAllComponents().get(td.getId())
+                .values()) {
+            if (c.type == Component.ComponentType.SPOUT) {
+                spouts.add(c);
+            }
+        }
+        return spouts;
+    }
+
+    private Integer getLongestPriorityListSize(Map<Integer, List<ExecutorDetails>> priorityToExecutorMap) {
+        Integer mostNum = 0;
+        for (List<ExecutorDetails> execs : priorityToExecutorMap.values()) {
+            Integer numExecs = execs.size();
+            if (mostNum < numExecs) {
+                mostNum = numExecs;
+            }
+        }
+        return mostNum;
+    }
+
+    /**
+     * Get the remaining amount memory that can be assigned to a worker given the set worker max heap size
+     * @param ws
+     * @param td
+     * @param scheduleAssignmentMap
+     * @return The remaining amount of memory
+     */
+    private Double getWorkerScheduledMemoryAvailable(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+        Double memScheduleUsed = this.getWorkerScheduledMemoryUse(ws, td, scheduleAssignmentMap);
+        return td.getTopologyWorkerMaxHeapSize() - memScheduleUsed;
+    }
+
+    /**
+     * Get the amount of memory already assigned to a worker
+     * @param ws
+     * @param td
+     * @param scheduleAssignmentMap
+     * @return the amount of memory
+     */
+    private Double getWorkerScheduledMemoryUse(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+        Double totalMem = 0.0;
+        Collection<ExecutorDetails> execs = scheduleAssignmentMap.get(ws);
+        if(execs != null) {
+            for(ExecutorDetails exec : execs) {
+                totalMem += td.getTotalMemReqTask(exec);
+            }
+        } 
+        return totalMem;
+    }
+
+    /**
+     * Checks whether we can schedule an Executor exec on the worker slot ws
+     * Only considers memory currently.  May include CPU in the future
+     * @param exec
+     * @param ws
+     * @param td
+     * @param scheduleAssignmentMap
+     * @return a boolean: True denoting the exec can be scheduled on ws and false if it cannot
+     */
+    private boolean checkWorkerConstraints(ExecutorDetails exec, WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+        boolean retVal = false;
+        if(this.getWorkerScheduledMemoryAvailable(ws, td, scheduleAssignmentMap) >= td.getTotalMemReqTask(exec)) {
+            retVal = true;
+        }
+        return retVal;
+    }
+
+    /**
+     * Get the amount of resources available and total for each node
+     * @return a String with cluster resource info for debug
+     */
+    private String getClusterInfo() {
+        String retVal = "Cluster info:\n";
+        for(Entry<String, List<String>> clusterEntry : _clusterInfo.entrySet()) {
+            String clusterId = clusterEntry.getKey();
+            retVal += "Rack: " + clusterId + "\n";
+            for(String nodeHostname : clusterEntry.getValue()) {
+                RAS_Node node = this.idToNode(this.NodeHostnameToId(nodeHostname));
+                retVal += "-> Node: " + node.getHostname() + " " + node.getId() + "\n";
+                retVal += "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + "}\n";
+                retVal += "--> Total Resources: {Mem " + node.getTotalMemoryResources() + ", CPU " + node.getTotalCpuResources() + "}\n";
+            }
+        }
+        return retVal;
+    }
+
+    /**
+     * hostname to Id
+     * @param hostname
+     * @return the id of a node
+     */
+    public String NodeHostnameToId(String hostname) {
+        for (RAS_Node n : _nodes.values()) {
+            if (n.getHostname() == null) {
+                continue;
+            }
+            if (n.getHostname().equals(hostname)) {
+                return n.getId();
+            }
+        }
+        LOG.error("Cannot find Node with hostname {}", hostname);
+        return null;
+    }
+
+    /**
+     * Find RAS_Node for specified node id
+     * @param id
+     * @return a RAS_Node object
+     */
+    public RAS_Node idToNode(String id) {
+        if(_nodes.containsKey(id) == false) {
+            LOG.error("Cannot find Node with Id: {}", id);
+            return null;
+        }
+        return _nodes.get(id);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
new file mode 100644
index 0000000..12e8ff3
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource.strategies.scheduling;
+
+import java.util.Collection;
+import java.util.Map;
+
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.scheduler.resource.RAS_Node;
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.SchedulingResult;
+import backtype.storm.scheduler.resource.User;
+
+/**
+ * An interface to for implementing different scheduling strategies for the resource aware scheduling
+ * In the future stategies will be pluggable
+ */
+public interface IStrategy {
+
+    public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
+
+    public SchedulingResult schedule(TopologyDetails td);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index 52c4ed1..b82a4ec 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -523,6 +523,31 @@ public class ConfigValidation {
         }
     }
 
+    public static class ImplementsClassValidator extends Validator {
+
+        Class classImplements;
+
+        public ImplementsClassValidator(Map<String, Object> params) {
+            this.classImplements = (Class) params.get(ConfigValidationAnnotations.ValidatorParams.IMPLEMENTS_CLASS);
+        }
+
+        @Override
+        public void validateField(String name, Object o) {
+            if(o == null) {
+                return;
+            }
+            SimpleTypeValidator.validateField(name, String.class, o);
+            try {
+                Class objectClass = Class.forName((String) o);
+                if(!this.classImplements.isAssignableFrom(objectClass)) {
+                    throw new IllegalArgumentException("Field " + name + " with value " + o + " does not implement " + this.classImplements.getName());
+                }
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     /**
      * Methods for validating confs
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/4cd5efa3/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
index c47f523..28707e4 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidationAnnotations.java
@@ -46,6 +46,7 @@ public class ConfigValidationAnnotations {
         static final String VALUE_TYPE = "valueType";
         static final String INCLUDE_ZERO = "includeZero";
         static final String ACCEPTED_VALUES = "acceptedValues";
+        static final String IMPLEMENTS_CLASS = "implementsClass";
     }
 
     /**
@@ -176,6 +177,14 @@ public class ConfigValidationAnnotations {
         boolean includeZero() default false;
     }
 
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
+    public @interface isImplementationOfClass {
+        Class validatorClass() default ConfigValidation.ImplementsClassValidator.class;
+
+        Class implementsClass();
+    }
+
     /**
      * Complex/custom type validators
      */