You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:31 UTC

[03/23] storm git commit: adding unit tests for STORM-898

adding unit tests for STORM-898


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3f55feef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3f55feef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3f55feef

Branch: refs/heads/master
Commit: 3f55feef43602beefebdb6ae20c64bceff2dbecb
Parents: 97b2c9a
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Nov 12 09:48:07 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Dec 4 13:06:45 2015 -0600

----------------------------------------------------------------------
 conf/defaults.yaml                              |   1 +
 .../jvm/backtype/storm/scheduler/Cluster.java   |  17 ++
 .../storm/scheduler/SupervisorDetails.java      |  12 +-
 .../backtype/storm/scheduler/Topologies.java    |   9 +
 .../storm/scheduler/TopologyDetails.java        |  43 ++-
 .../storm/scheduler/resource/RAS_Node.java      | 163 ++--------
 .../storm/scheduler/resource/RAS_Nodes.java     | 149 +++++++++
 .../resource/ResourceAwareScheduler.java        | 285 ++++++++++++++---
 .../scheduler/resource/SchedulingResult.java    | 111 +++++++
 .../scheduler/resource/SchedulingStatus.java    |  40 +++
 .../backtype/storm/scheduler/resource/User.java | 122 +++++++-
 .../resource/strategies/IStrategy.java          |   3 +-
 .../strategies/ResourceAwareStrategy.java       |  17 +-
 .../scheduler/resource_aware_scheduler_test.clj | 126 +++++---
 .../jvm/backtype/storm/TestConfigValidate.java  |  58 ++++
 .../storm/scheduler/resource/Experiment.java    | 175 +++++++++++
 .../resource/TestResourceAwareScheduler.java    | 305 +++++++++++++++++++
 .../storm/scheduler/resource/TestUser.java      | 111 +++++++
 .../TestUtilsForResourceAwareScheduler.java     | 265 ++++++++++++++++
 19 files changed, 1742 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index aeda11c..2a99ba6 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -249,6 +249,7 @@ topology.disruptor.wait.timeout.millis: 1000
 topology.disruptor.batch.size: 100
 topology.disruptor.batch.timeout.millis: 1
 topology.disable.loadaware: false
+topology.priority: 30
 
 # Configs for Resource Aware Scheduler
 topology.component.resources.onheap.memory.mb: 128.0

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index f4d12d8..7f16b86 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -379,6 +379,16 @@ public class Cluster {
     }
 
     /**
+     * get slots used by a topology
+     */
+    public Collection<WorkerSlot> getUsedSlotsByTopologyId(String topologyId) {
+        if (!this.assignments.containsKey(topologyId)) {
+            return null;
+        }
+        return this.assignments.get(topologyId).getSlots();
+    }
+
+    /**
      * Get a specific supervisor with the <code>nodeId</code>
      */
     public SupervisorDetails getSupervisorById(String nodeId) {
@@ -429,6 +439,13 @@ public class Cluster {
         return ret;
     }
 
+    public void setAssignments(Map<String, SchedulerAssignment> assignments) {
+        this.assignments = new HashMap<String, SchedulerAssignmentImpl>();
+        for(Map.Entry<String, SchedulerAssignmentImpl> entry : this.assignments.entrySet()) {
+            this.assignments.put(entry.getKey(), new SchedulerAssignmentImpl(entry.getValue().getTopologyId(), entry.getValue().getExecutorToSlot()));
+        }
+    }
+
     /**
      * Get all the supervisors.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
index a748e11..d93252f 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
@@ -119,10 +119,18 @@ public class SupervisorDetails {
     }
 
     public Double getTotalMemory() {
-        return getTotalResource(Config.SUPERVISOR_MEMORY_CAPACITY_MB);
+        Double totalMemory = getTotalResource(Config.SUPERVISOR_MEMORY_CAPACITY_MB);
+        if(totalMemory == null) {
+            throw new IllegalStateException("default value for supervisor.memory.capacity.mb is not set!");
+        }
+        return totalMemory;
     }
 
     public Double getTotalCPU() {
-        return getTotalResource(Config.SUPERVISOR_CPU_CAPACITY);
+        Double totalCPU = getTotalResource(Config.SUPERVISOR_CPU_CAPACITY);
+        if(totalCPU == null) {
+            throw new IllegalStateException("default value for supervisor.cpu.capacity is not set!");
+        }
+        return totalCPU;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
index 443bf3f..0b4c0ca 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
@@ -67,4 +67,13 @@ public class Topologies {
         }
         return _allComponents;
     }
+
+    @Override
+    public String toString() {
+        String ret = "Topologies:\n";
+        for(TopologyDetails td : this.getTopologies()) {
+            ret += td.toString() + "\n";
+        }
+        return ret;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
index ac515b7..3931b43 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
@@ -63,17 +63,22 @@ public class TopologyDetails {
     }
 
     public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology,
-                           int numWorkers, Map<ExecutorDetails, String> executorToComponents, int launchTime) {
+                           int numWorkers, Map<ExecutorDetails, String> executorToComponents) {
         this(topologyId, topologyConf, topology, numWorkers);
         this.executorToComponent = new HashMap<>(0);
         if (executorToComponents != null) {
             this.executorToComponent.putAll(executorToComponents);
         }
-        this.launchTime = launchTime;
         this.initResourceList();
         this.initConfigs();
     }
 
+    public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology,
+                           int numWorkers, Map<ExecutorDetails, String> executorToComponents, int launchTime) {
+        this(topologyId, topologyConf, topology, numWorkers, executorToComponents);
+        this.launchTime = launchTime;
+    }
+
     public String getId() {
         return topologyId;
     }
@@ -410,13 +415,24 @@ public class TopologyDetails {
      * Add default resource requirements for a executor
      */
     public void addDefaultResforExec(ExecutorDetails exec) {
+
+        Double topologyComponentCpuPcorePercent = Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
+        if(topologyComponentCpuPcorePercent == null) {
+            LOG.warn("default value for topology.component.cpu.pcore.percent needs to be set!");
+        }
+        Double topologyComponentResourcesOffheapMemoryMb =  Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+        if(topologyComponentResourcesOffheapMemoryMb == null) {
+            LOG.warn("default value for topology.component.resources.offheap.memory.mb needs to be set!");
+        }
+        Double topologyComponentResourcesOnheapMemoryMb = Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+        if(topologyComponentResourcesOnheapMemoryMb == null) {
+            LOG.warn("default value for topology.component.resources.onheap.memory.mb needs to be set!");
+        }
+
         Map<String, Double> defaultResourceList = new HashMap<>();
-        defaultResourceList.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT,
-                        Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null));
-        defaultResourceList.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
-                        Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null));
-        defaultResourceList.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
-                        Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null));
+        defaultResourceList.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topologyComponentCpuPcorePercent);
+        defaultResourceList.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topologyComponentResourcesOffheapMemoryMb);
+        defaultResourceList.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topologyComponentResourcesOnheapMemoryMb);
         LOG.debug("Scheduling Executor: {} with memory requirement as onHeap: {} - offHeap: {} " +
                         "and CPU requirement: {}",
                 exec, topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
@@ -426,12 +442,17 @@ public class TopologyDetails {
     }
 
     /**
-     * initializes the scheduler member variable by extracting what scheduler
-     * this topology is going to use from topologyConf
+     * initializes member variables
      */
     private void initConfigs() {
         this.topologyWorkerMaxHeapSize = Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB), null);
+        if(this.topologyWorkerMaxHeapSize == null) {
+            LOG.warn("default value for topology.worker.max.heap.size.mb needs to be set!");
+        }
         this.topologyPriority = Utils.getInt(this.topologyConf.get(Config.TOPOLOGY_PRIORITY), null);
+        if(this.topologyPriority == null) {
+            LOG.warn("default value for topology.priority needs to be set!");
+        }
     }
 
     /**
@@ -459,7 +480,7 @@ public class TopologyDetails {
 
     @Override
     public String toString() {
-        return "Name: " + this.getName() + " Priority: " + this.getTopologyPriority()
+        return "Name: " + this.getName() + " id: " + this.getId() + " Priority: " + this.getTopologyPriority()
                 + " Uptime: " + this.getUpTime() + " CPU: " + this.getTotalRequestedCpu()
                 + " Memory: " + (this.getTotalRequestedMemOffHeap() + this.getTotalRequestedMemOnHeap());
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
index 1f2e795..b0bcc8a 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
@@ -21,6 +21,7 @@ package backtype.storm.scheduler.resource;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -53,14 +54,11 @@ public class RAS_Node {
     private Double _availMemory;
     private Double _availCPU;
     private List<WorkerSlot> _slots;
-    private List<ExecutorDetails> _execs;
-    private Map<WorkerSlot, List<ExecutorDetails>> _slotToExecs;
+    private Cluster _cluster;
 
     public RAS_Node(String nodeId, Set<Integer> allPorts, boolean isAlive,
-                    SupervisorDetails sup) {
+                    SupervisorDetails sup, Cluster cluster) {
         _slots = new ArrayList<WorkerSlot>();
-        _execs = new ArrayList<ExecutorDetails>();
-        _slotToExecs = new HashMap<WorkerSlot, List<ExecutorDetails>>();
         _nodeId = nodeId;
         _isAlive = isAlive;
         if (_isAlive && allPorts != null) {
@@ -72,10 +70,8 @@ public class RAS_Node {
             _availMemory = this.getTotalMemoryResources();
             _availCPU = this.getTotalCpuResources();
             _slots.addAll(_freeSlots);
-            for (WorkerSlot ws : _slots) {
-                _slotToExecs.put(ws, new ArrayList<ExecutorDetails>());
-            }
         }
+        this._cluster = cluster;
     }
 
     public String getId() {
@@ -90,6 +86,14 @@ public class RAS_Node {
         return _freeSlots;
     }
 
+    public Collection<WorkerSlot> getUsedSlots() {
+        Collection<WorkerSlot> ret = new LinkedList<WorkerSlot>();
+        for(Collection<WorkerSlot> workers : _topIdToUsedSlots.values()) {
+            ret.addAll(workers);
+        }
+        return ret;
+    }
+
     public boolean isAlive() {
         return _isAlive;
     }
@@ -138,7 +142,7 @@ public class RAS_Node {
         }
     }
 
-    private void addOrphanedSlot(WorkerSlot ws) {
+     void addOrphanedSlot(WorkerSlot ws) {
         if (_isAlive) {
             throw new IllegalArgumentException("Orphaned Slots " +
                     "only are allowed on dead nodes.");
@@ -153,7 +157,6 @@ public class RAS_Node {
             }
         }
         _freeSlots.add(ws);
-        _slotToExecs.put(ws, new ArrayList<ExecutorDetails>());
     }
 
     boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) {
@@ -175,14 +178,13 @@ public class RAS_Node {
 
     /**
      * Free all slots on this node.  This will update the Cluster too.
-     * @param cluster the cluster to be updated
      */
-    public void freeAllSlots(Cluster cluster) {
+    public void freeAllSlots() {
         if (!_isAlive) {
             LOG.warn("Freeing all slots on a dead node {} ", _nodeId);
         }
         for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
-            cluster.freeSlots(entry.getValue());
+            _cluster.freeSlots(entry.getValue());
             if (_isAlive) {
                 _freeSlots.addAll(entry.getValue());
             }
@@ -193,14 +195,13 @@ public class RAS_Node {
     /**
      * Frees a single slot in this node
      * @param ws the slot to free
-     * @param cluster the cluster to update
      */
-    public void free(WorkerSlot ws, Cluster cluster) {
+    public void free(WorkerSlot ws) {
         if (_freeSlots.contains(ws)) return;
         for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
             Set<WorkerSlot> slots = entry.getValue();
             if (slots.remove(ws)) {
-                cluster.freeSlot(ws);
+                _cluster.freeSlot(ws);
                 if (_isAlive) {
                     _freeSlots.add(ws);
                 }
@@ -214,15 +215,14 @@ public class RAS_Node {
     /**
      * Frees all the slots for a topology.
      * @param topId the topology to free slots for
-     * @param cluster the cluster to update
      */
-    public void freeTopology(String topId, Cluster cluster) {
+    public void freeTopology(String topId) {
         Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
         if (slots == null || slots.isEmpty()) {
             return;
         }
         for (WorkerSlot ws : slots) {
-            cluster.freeSlot(ws);
+            _cluster.freeSlot(ws);
             if (_isAlive) {
                 _freeSlots.add(ws);
             }
@@ -257,8 +257,7 @@ public class RAS_Node {
         slot.allocateResource(onHeapMem, offHeapMem, cpu);
     }
 
-    public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors,
-                       Cluster cluster) {
+    public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors) {
         if (!_isAlive) {
             throw new IllegalStateException("Trying to adding to a dead node " + _nodeId);
         }
@@ -276,7 +275,7 @@ public class RAS_Node {
             throw new IllegalStateException("Trying to assign already used slot" + target.getPort() + "on node " + _nodeId);
         } else {
             allocateResourceToSlot(td, executors, target);
-            cluster.assign(target, td.getId(), executors);
+            _cluster.assign(target, td.getId(), executors);
             assignInternal(target, td.getId(), false);
         }
     }
@@ -286,11 +285,9 @@ public class RAS_Node {
      * This will update the cluster too.
      * @param td the TopologyDetails to assign a free slot to.
      * @param executors the executors to run in that slot.
-     * @param cluster the cluster to be updated
      */
-    public void assign(TopologyDetails td, Collection<ExecutorDetails> executors,
-                       Cluster cluster) {
-        this.assign(null, td, executors, cluster);
+    public void assign(TopologyDetails td, Collection<ExecutorDetails> executors) {
+        this.assign(null, td, executors);
     }
 
     @Override
@@ -347,70 +344,8 @@ public class RAS_Node {
         return total;
     }
 
-    public static Map<String, RAS_Node> getAllNodesFrom(Cluster cluster, Topologies topologies) {
-        Map<String, RAS_Node> nodeIdToNode = new HashMap<String, RAS_Node>();
-        for (SupervisorDetails sup : cluster.getSupervisors().values()) {
-            //Node ID and supervisor ID are the same.
-            String id = sup.getId();
-            boolean isAlive = !cluster.isBlackListed(id);
-            LOG.debug("Found a {} Node {} {}",
-                    isAlive ? "living" : "dead", id, sup.getAllPorts());
-            LOG.debug("resources_mem: {}, resources_CPU: {}", sup.getTotalMemory(), sup.getTotalCPU());
-            nodeIdToNode.put(sup.getId(), new RAS_Node(id, sup.getAllPorts(), isAlive, sup));
-        }
-        for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
-            String topId = entry.getValue().getTopologyId();
-            for (WorkerSlot workerSlot : entry.getValue().getSlots()) {
-                String id = workerSlot.getNodeId();
-                RAS_Node node = nodeIdToNode.get(id);
-                if (node == null) {
-                    LOG.info("Found an assigned slot on a dead supervisor {} with executors {}",
-                            workerSlot, getExecutors(workerSlot, cluster));
-                    node = new RAS_Node(id, null, false, null);
-                    nodeIdToNode.put(id, node);
-                }
-                if (!node.isAlive()) {
-                    //The supervisor on the node down so add an orphaned slot to hold the unsupervised worker
-                    node.addOrphanedSlot(workerSlot);
-                }
-                if (node.assignInternal(workerSlot, topId, true)) {
-                    LOG.warn("Bad scheduling state, " + workerSlot + " assigned multiple workers, unassigning everything...");
-                    node.free(workerSlot, cluster);
-                }
-            }
-        }
-        RAS_Node.updateAvailableResources(cluster, topologies, nodeIdToNode);
-
-        for (Map.Entry<String, SchedulerAssignment> entry : cluster
-                .getAssignments().entrySet()) {
-            for (Map.Entry<ExecutorDetails, WorkerSlot> exec : entry.getValue()
-                    .getExecutorToSlot().entrySet()) {
-                ExecutorDetails ed = exec.getKey();
-                WorkerSlot ws = exec.getValue();
-                String node_id = ws.getNodeId();
-                if (nodeIdToNode.containsKey(node_id)) {
-                    RAS_Node node = nodeIdToNode.get(node_id);
-                    if (node._slotToExecs.containsKey(ws)) {
-                        node._slotToExecs.get(ws).add(ed);
-                        node._execs.add(ed);
-                    } else {
-                        LOG.info(
-                                "ERROR: should have node {} should have worker: {}",
-                                node_id, ed);
-                        return null;
-                    }
-                } else {
-                    LOG.info("ERROR: should have node {}", node_id);
-                    return null;
-                }
-            }
-        }
-        return nodeIdToNode;
-    }
-
     //This function is only used for logging information
-    private static Collection<ExecutorDetails> getExecutors(WorkerSlot ws,
-                                                            Cluster cluster) {
+    public static Collection<ExecutorDetails> getExecutors(WorkerSlot ws, Cluster cluster) {
         Collection<ExecutorDetails> retList = new ArrayList<ExecutorDetails>();
         for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments()
                 .entrySet()) {
@@ -430,56 +365,6 @@ public class RAS_Node {
     }
 
     /**
-     * updates the available resources for every node in a cluster
-     * by recalculating memory requirements.
-     * @param cluster the cluster used in this calculation
-     * @param topologies container of all topologies
-     * @param nodeIdToNode a map between node id and node
-     */
-    private static void updateAvailableResources(Cluster cluster,
-                                                 Topologies topologies,
-                                                 Map<String, RAS_Node> nodeIdToNode) {
-        //recompute memory
-        if (cluster.getAssignments().size() > 0) {
-            for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments()
-                    .entrySet()) {
-                Map<ExecutorDetails, WorkerSlot> executorToSlot = entry.getValue()
-                        .getExecutorToSlot();
-                Map<ExecutorDetails, Double> topoMemoryResourceList = topologies.getById(entry.getKey()).getTotalMemoryResourceList();
-
-                if (topoMemoryResourceList == null || topoMemoryResourceList.size() == 0) {
-                    continue;
-                }
-                for (Map.Entry<ExecutorDetails, WorkerSlot> execToSlot : executorToSlot
-                        .entrySet()) {
-                    WorkerSlot slot = execToSlot.getValue();
-                    ExecutorDetails exec = execToSlot.getKey();
-                    RAS_Node node = nodeIdToNode.get(slot.getNodeId());
-                    if (!node.isAlive()) {
-                        continue;
-                        // We do not free the assigned slots (the orphaned slots) on the inactive supervisors
-                        // The inactive node will be treated as a 0-resource node and not available for other unassigned workers
-                    }
-                    if (topoMemoryResourceList.containsKey(exec)) {
-                        node.consumeResourcesforTask(exec, topologies.getById(entry.getKey()));
-                    } else {
-                        LOG.warn("Resource Req not found...Scheduling Task{} with memory requirement as on heap - {} and off heap - {} and CPU requirement as {}",
-                                exec,
-                                Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
-                                Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
-                        topologies.getById(entry.getKey()).addDefaultResforExec(exec);
-                        node.consumeResourcesforTask(exec, topologies.getById(entry.getKey()));
-                    }
-                }
-            }
-        } else {
-            for (RAS_Node n : nodeIdToNode.values()) {
-                n.setAvailableMemory(n.getAvailableMemoryResources());
-            }
-        }
-    }
-
-    /**
      * Sets the Available Memory for a node
      * @param amount the amount to set as available memory
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
new file mode 100644
index 0000000..9df1475
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import backtype.storm.Config;
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.SchedulerAssignment;
+import backtype.storm.scheduler.SupervisorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.WorkerSlot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RAS_Nodes {
+
+    private Map<String, RAS_Node> nodeMap;
+    private Cluster cluster;
+    private Topologies topologies;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RAS_Nodes.class);
+
+
+    public RAS_Nodes(Cluster cluster, Topologies topologies) {
+        this.nodeMap = getAllNodesFrom(cluster, topologies);
+        this.cluster = cluster;
+        this.topologies = topologies;
+    }
+
+    public static Map<String, RAS_Node> getAllNodesFrom(Cluster cluster, Topologies topologies) {
+        Map<String, RAS_Node> nodeIdToNode = new HashMap<String, RAS_Node>();
+        for (SupervisorDetails sup : cluster.getSupervisors().values()) {
+            //Node ID and supervisor ID are the same.
+            String id = sup.getId();
+            boolean isAlive = !cluster.isBlackListed(id);
+            LOG.debug("Found a {} Node {} {}",
+                    isAlive ? "living" : "dead", id, sup.getAllPorts());
+            LOG.debug("resources_mem: {}, resources_CPU: {}", sup.getTotalMemory(), sup.getTotalCPU());
+            nodeIdToNode.put(sup.getId(), new RAS_Node(id, sup.getAllPorts(), isAlive, sup, cluster));
+        }
+        for (Map.Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
+            String topId = entry.getValue().getTopologyId();
+            for (WorkerSlot workerSlot : entry.getValue().getSlots()) {
+                String id = workerSlot.getNodeId();
+                RAS_Node node = nodeIdToNode.get(id);
+                if (node == null) {
+                    LOG.info("Found an assigned slot on a dead supervisor {} with executors {}",
+                            workerSlot, RAS_Node.getExecutors(workerSlot, cluster));
+                    node = new RAS_Node(id, null, false, null, cluster);
+                    nodeIdToNode.put(id, node);
+                }
+                if (!node.isAlive()) {
+                    //The supervisor on the node down so add an orphaned slot to hold the unsupervised worker
+                    node.addOrphanedSlot(workerSlot);
+                }
+                if (node.assignInternal(workerSlot, topId, true)) {
+                    LOG.warn("Bad scheduling state, " + workerSlot + " assigned multiple workers, unassigning everything...");
+                    node.free(workerSlot);
+                }
+            }
+        }
+        updateAvailableResources(cluster, topologies, nodeIdToNode);
+        return nodeIdToNode;
+    }
+
+    /**
+     * updates the available resources for every node in a cluster
+     * by recalculating memory requirements.
+     * @param cluster the cluster used in this calculation
+     * @param topologies container of all topologies
+     * @param nodeIdToNode a map between node id and node
+     */
+    private static void updateAvailableResources(Cluster cluster,
+                                                 Topologies topologies,
+                                                 Map<String, RAS_Node> nodeIdToNode) {
+        //recompute memory
+        if (cluster.getAssignments().size() > 0) {
+            for (Map.Entry<String, SchedulerAssignment> entry : cluster.getAssignments()
+                    .entrySet()) {
+                Map<ExecutorDetails, WorkerSlot> executorToSlot = entry.getValue()
+                        .getExecutorToSlot();
+                Map<ExecutorDetails, Double> topoMemoryResourceList = topologies.getById(entry.getKey()).getTotalMemoryResourceList();
+
+                if (topoMemoryResourceList == null || topoMemoryResourceList.size() == 0) {
+                    continue;
+                }
+                for (Map.Entry<ExecutorDetails, WorkerSlot> execToSlot : executorToSlot
+                        .entrySet()) {
+                    WorkerSlot slot = execToSlot.getValue();
+                    ExecutorDetails exec = execToSlot.getKey();
+                    RAS_Node node = nodeIdToNode.get(slot.getNodeId());
+                    if (!node.isAlive()) {
+                        continue;
+                        // We do not free the assigned slots (the orphaned slots) on the inactive supervisors
+                        // The inactive node will be treated as a 0-resource node and not available for other unassigned workers
+                    }
+                    if (topoMemoryResourceList.containsKey(exec)) {
+                        node.consumeResourcesforTask(exec, topologies.getById(entry.getKey()));
+                    } else {
+                        LOG.warn("Resource Req not found...Scheduling Task{} with memory requirement as on heap - {} and off heap - {} and CPU requirement as {}",
+                                exec,
+                                Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
+                                Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
+                        topologies.getById(entry.getKey()).addDefaultResforExec(exec);
+                        node.consumeResourcesforTask(exec, topologies.getById(entry.getKey()));
+                    }
+                }
+            }
+        } else {
+            for (RAS_Node n : nodeIdToNode.values()) {
+                n.setAvailableMemory(n.getAvailableMemoryResources());
+            }
+        }
+    }
+
+    public RAS_Node getNodeById(String nodeId) {
+        return this.nodeMap.get(nodeId);
+    }
+
+    public void freeSlots(Collection<WorkerSlot> workerSlots) {
+        for(RAS_Node node : nodeMap.values()) {
+            for(WorkerSlot ws : node.getUsedSlots()) {
+                if(workerSlots.contains(ws)) {
+                    node.free(ws);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index 65d1841..279d060 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -19,6 +19,7 @@
 package backtype.storm.scheduler.resource;
 
 import backtype.storm.Config;
+import backtype.storm.scheduler.SchedulerAssignment;
 import backtype.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,9 +40,10 @@ import java.util.Set;
 
 public class ResourceAwareScheduler implements IScheduler {
 
-    private Map<String, User> userMap = new HashMap<String, User>();
+    private Map<String, User> userMap;
     private Cluster cluster;
     private Topologies topologies;
+    private RAS_Nodes nodes;
 
 
     @SuppressWarnings("rawtypes")
@@ -53,21 +55,26 @@ public class ResourceAwareScheduler implements IScheduler {
     @Override
     public void prepare(Map conf) {
         this.conf = conf;
+
     }
 
     @Override
     public void schedule(Topologies topologies, Cluster cluster) {
         LOG.info("\n\n\nRerunning ResourceAwareScheduler...");
         LOG.debug(ResourceUtils.printScheduling(cluster, topologies));
+        LOG.info("topologies: {}", topologies);
 
         this.initialize(topologies, cluster);
 
-
         LOG.info("UserMap:\n{}", this.userMap);
         for(User user : this.getUserMap().values()) {
             LOG.info(user.getDetailedInfo());
         }
 
+        for(TopologyDetails topo : topologies.getTopologies()) {
+            LOG.info("topo {} status: {}", topo, cluster.getStatusMap().get(topo.getId()));
+        }
+
         LOG.info("getNextUser: {}", this.getNextUser());
 
         while(true) {
@@ -80,15 +87,135 @@ public class ResourceAwareScheduler implements IScheduler {
         }
     }
 
-    private void scheduleTopology(TopologyDetails td) {
+    private boolean makeSpaceForTopo(TopologyDetails td) {
+        User submitter = this.userMap.get(td.getTopologySubmitter());
+        if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null) {
+            return false;
+        }
+
+        double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCPUResourceGuaranteed();
+        double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
+
+        //user has enough resource under his or her resource guarantee to schedule topology
+        if ((1.0 - submitter.getCPUResourcePoolUtilization()) > cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) > memoryNeeded) {
+            User evictUser = this.findUserWithMostResourcesAboveGuarantee();
+            if (evictUser == null) {
+                LOG.info("Cannot make space for topology {} from user {}", td.getName(), submitter.getId());
+                submitter.moveTopoFromPendingToAttempted(td, this.cluster);
+
+                return false;
+            }
+            TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
+            LOG.info("topology to evict: {}", topologyEvict);
+            evictTopology(topologyEvict);
+
+            return true;
+        } else {
+
+            if ((1.0 - submitter.getCPUResourcePoolUtilization()) < cpuNeeded) {
+
+            }
+
+            if ((1.0 - submitter.getMemoryResourcePoolUtilization()) < memoryNeeded) {
+
+            }
+            return false;
+
+        }
+    }
+
+    private void evictTopology(TopologyDetails topologyEvict) {
+        Collection<WorkerSlot> workersToEvict = this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId());
+        User submitter = this.userMap.get(topologyEvict.getTopologySubmitter());
+
+        LOG.info("Evicting Topology {} with workers: {}", topologyEvict.getName(), workersToEvict);
+        this.nodes.freeSlots(workersToEvict);
+        submitter.moveTopoFromRunningToPending(topologyEvict, this.cluster);
+        LOG.info("check if topology unassigned: {}", this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId()));
+    }
+
+    private User findUserWithMostResourcesAboveGuarantee() {
+        double most = 0.0;
+        User mostOverUser = null;
+        for(User user : this.userMap.values()) {
+            double over = user.getResourcePoolAverageUtilization() -1.0;
+            if((over > most) && (!user.getTopologiesRunning().isEmpty())) {
+                most = over;
+                mostOverUser = user;
+            }
+        }
+        return mostOverUser;
+    }
+
+    public void resetAssignments(Map<String, SchedulerAssignment> assignmentCheckpoint) {
+        this.cluster.setAssignments(assignmentCheckpoint);
+    }
+
+    public void scheduleTopology(TopologyDetails td) {
         ResourceAwareStrategy RAStrategy = new ResourceAwareStrategy(this.cluster, this.topologies);
-        if (cluster.needsScheduling(td) && cluster.getUnassignedExecutors(td).size() > 0) {
-            LOG.info("/********Scheduling topology {} from User {}************/", td.getName(), td.getTopologySubmitter());
+        User topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+        if (cluster.getUnassignedExecutors(td).size() > 0) {
+            LOG.info("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
             LOG.info("{}", this.userMap.get(td.getTopologySubmitter()).getDetailedInfo());
             LOG.info("{}", User.getResourcePoolAverageUtilizationForUsers(this.userMap.values()));
 
-            Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = RAStrategy.schedule(td);
+            Map<String, SchedulerAssignment> assignmentCheckpoint = this.cluster.getAssignments();
+
+            while (true) {
+                SchedulingResult result = RAStrategy.schedule(td);
+                LOG.info("scheduling result: {}", result);
+                if (result.isValid()) {
+                    if (result.isSuccess()) {
+                        try {
+                            if(mkAssignment(td, result.getSchedulingResultMap())) {
+                                topologySubmitter.moveTopoFromPendingToRunning(td, this.cluster);
+                            } else {
+                                resetAssignments(assignmentCheckpoint);
+                                topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
+                            }
+                        } catch (IllegalStateException ex) {
+                            LOG.error(ex.toString());
+                            LOG.error("Unsuccessful in scheduling", td.getId());
+                            this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
+                            resetAssignments(assignmentCheckpoint);
+                            topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
+                        }
+                        break;
+                    } else {
+                        if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
+                            if(!this.makeSpaceForTopo(td)) {
+                                topologySubmitter.moveTopoFromPendingToAttempted(td);
+                                this.cluster.setStatus(td.getId(), result.getErrorMessage());
+                                resetAssignments(assignmentCheckpoint);
+                                break;
+                            }
+                            continue;
+                        } else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
+                            topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
+                            resetAssignments(assignmentCheckpoint);
+                            break;
+                        } else {
+                            topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
+                            resetAssignments(assignmentCheckpoint);
+                            break;
+                        }
+                    }
+                } else {
+                    LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", td.getName());
+                    topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
+                    resetAssignments(assignmentCheckpoint);
+                    break;
+                }
+            }
+        } else {
+            LOG.warn("Topology {} is already fully scheduled!", td.getName());
+            topologySubmitter.moveTopoFromPendingToRunning(td, this.cluster);
+            throw new IllegalStateException("illegal");
+        }
+    }
 
+    private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) {
+        if (schedulerAssignmentMap != null) {
             double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
             double requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
             double requestedCpu = td.getTotalRequestedCpu();
@@ -96,39 +223,25 @@ public class ResourceAwareScheduler implements IScheduler {
             double assignedMemOffHeap = 0.0;
             double assignedCpu = 0.0;
 
-            if (schedulerAssignmentMap != null) {
-                try {
-                    Set<String> nodesUsed = new HashSet<String>();
-                    int assignedWorkers = schedulerAssignmentMap.keySet().size();
-                    for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
-                        WorkerSlot targetSlot = workerToTasksEntry.getKey();
-                        Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
-                        RAS_Node targetNode = RAStrategy.idToNode(targetSlot.getNodeId());
-                        targetNode.assign(targetSlot, td, execsNeedScheduling, this.cluster);
-                        LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: {} To Node: {} on Slot: {}",
-                                td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort());
-                        if (!nodesUsed.contains(targetNode.getId())) {
-                            nodesUsed.add(targetNode.getId());
-                        }
-                        assignedMemOnHeap += targetSlot.getAllocatedMemOnHeap();
-                        assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap();
-                        assignedCpu += targetSlot.getAllocatedCpu();
-                    }
-                    LOG.debug("Topology: {} assigned to {} nodes on {} workers", td.getId(), nodesUsed.size(), assignedWorkers);
-                    this.cluster.setStatus(td.getId(), "Fully Scheduled");
-                    this.getUser(td.getTopologySubmitter()).moveTopoFromPendingToRunning(td);
-                    LOG.info("getNextUser: {}", this.getNextUser());
-                } catch (IllegalStateException ex) {
-                    LOG.error(ex.toString());
-                    LOG.error("Unsuccessful in scheduling", td.getId());
-                    this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
-                    this.getUser(td.getTopologySubmitter()).moveTopoFromPendingToAttempted(td);
+            Set<String> nodesUsed = new HashSet<String>();
+            int assignedWorkers = schedulerAssignmentMap.keySet().size();
+            for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
+                WorkerSlot targetSlot = workerToTasksEntry.getKey();
+                Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
+                RAS_Node targetNode = this.nodes.getNodeById(targetSlot.getNodeId());
+                targetNode.assign(targetSlot, td, execsNeedScheduling);
+                LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: {} To Node: {} on Slot: {}",
+                        td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort());
+                if (!nodesUsed.contains(targetNode.getId())) {
+                    nodesUsed.add(targetNode.getId());
                 }
-            } else {
-                LOG.error("Unsuccessful in scheduling {}", td.getId());
-                this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
-                this.getUser(td.getTopologySubmitter()).moveTopoFromPendingToAttempted(td);
+                assignedMemOnHeap += targetSlot.getAllocatedMemOnHeap();
+                assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap();
+                assignedCpu += targetSlot.getAllocatedCpu();
             }
+            LOG.debug("Topology: {} assigned to {} nodes on {} workers", td.getId(), nodesUsed.size(), assignedWorkers);
+            this.cluster.setStatus(td.getId(), "Fully Scheduled");
+
             Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
                     assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
             LOG.debug("setResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " +
@@ -136,9 +249,10 @@ public class ResourceAwareScheduler implements IScheduler {
                     td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
                     assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
             this.cluster.setResources(td.getId(), resources);
+            return true;
         } else {
-            LOG.warn("Topology {} already scheduled!", td.getName());
-            this.cluster.setStatus(td.getId(), "Fully Scheduled");
+            LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", td.getName());
+            return false;
         }
         updateSupervisorsResources(cluster, topologies);
     }
@@ -157,6 +271,70 @@ public class ResourceAwareScheduler implements IScheduler {
         }
         cluster.setSupervisorsResourcesMap(supervisors_resources);
     }
+
+
+//    private void scheduleTopology(TopologyDetails td) {
+//        ResourceAwareStrategy RAStrategy = new ResourceAwareStrategy(this.cluster, this.topologies);
+//        if (cluster.needsScheduling(td) && cluster.getUnassignedExecutors(td).size() > 0) {
+//            LOG.info("/********Scheduling topology {} from User {}************/", td.getName(), td.getTopologySubmitter());
+//            LOG.info("{}", this.userMap.get(td.getTopologySubmitter()).getDetailedInfo());
+//            LOG.info("{}", User.getResourcePoolAverageUtilizationForUsers(this.userMap.values()));
+//
+//            Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = RAStrategy.schedule(td);
+//
+//            double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
+//            double requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
+//            double requestedCpu = td.getTotalRequestedCpu();
+//            double assignedMemOnHeap = 0.0;
+//            double assignedMemOffHeap = 0.0;
+//            double assignedCpu = 0.0;
+//
+//            if (schedulerAssignmentMap != null) {
+//                try {
+//                    Set<String> nodesUsed = new HashSet<String>();
+//                    int assignedWorkers = schedulerAssignmentMap.keySet().size();
+//                    for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
+//                        WorkerSlot targetSlot = workerToTasksEntry.getKey();
+//                        Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
+//                        RAS_Node targetNode = RAStrategy.idToNode(targetSlot.getNodeId());
+//                        targetNode.assign(targetSlot, td, execsNeedScheduling, this.cluster);
+//                        LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: {} To Node: {} on Slot: {}",
+//                                td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort());
+//                        if (!nodesUsed.contains(targetNode.getId())) {
+//                            nodesUsed.add(targetNode.getId());
+//                        }
+//                        assignedMemOnHeap += targetSlot.getAllocatedMemOnHeap();
+//                        assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap();
+//                        assignedCpu += targetSlot.getAllocatedCpu();
+//                    }
+//                    LOG.debug("Topology: {} assigned to {} nodes on {} workers", td.getId(), nodesUsed.size(), assignedWorkers);
+//                    this.cluster.setStatus(td.getId(), "Fully Scheduled");
+//                    this.getUser(td.getTopologySubmitter()).moveTopoFromPendingToRunning(td);
+//                    LOG.info("getNextUser: {}", this.getNextUser());
+//                } catch (IllegalStateException ex) {
+//                    LOG.error(ex.toString());
+//                    LOG.error("Unsuccessful in scheduling", td.getId());
+//                    this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
+//                    this.getUser(td.getTopologySubmitter()).moveTopoFromPendingToAttempted(td);
+//                }
+//            } else {
+//                LOG.error("Unsuccessful in scheduling {}", td.getId());
+//                this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
+//              //  this.evictTopology(td);
+//               // this.getUser(td.getTopologySubmitter()).moveTopoFromPendingToAttempted(td);
+//            }
+//            Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
+//                    assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
+//            LOG.debug("setResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " +
+//                            "assigned on-heap mem, off-heap mem, cpu: {} {} {}",
+//                    td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
+//                    assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
+//            this.cluster.setResources(td.getId(), resources);
+//        } else {
+//            LOG.warn("Topology {} already scheduled!", td.getName());
+//            this.cluster.setStatus(td.getId(), "Fully Scheduled");
+//        }
+//    }
     public User getUser(String user) {
         return this.userMap.get(user);
     }
@@ -166,9 +344,11 @@ public class ResourceAwareScheduler implements IScheduler {
     }
 
     public User getNextUser() {
-        Double least = Double.MAX_VALUE;
+        Double least = Double.POSITIVE_INFINITY;
         User ret = null;
         for(User user : this.userMap.values()) {
+            LOG.info("{}", user.getDetailedInfo());
+            LOG.info("hasTopologyNeedSchedule: {}", user.hasTopologyNeedSchedule());
             if(user.hasTopologyNeedSchedule()) {
                 Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
                 if (least > userResourcePoolAverageUtilization) {
@@ -199,18 +379,25 @@ public class ResourceAwareScheduler implements IScheduler {
      */
     private void initUsers(Topologies topologies, Cluster cluster) {
 
+        this.userMap = new HashMap<String, User>();
         Map<String, Map<String, Double>> userResourcePools = this.getUserResourcePools();
         LOG.info("userResourcePools: {}", userResourcePools);
 
-        for (TopologyDetails topo : topologies.getTopologies()) {
-            String topologySubmitter = topo.getTopologySubmitter();
+        for (TopologyDetails td : topologies.getTopologies()) {
+            LOG.info("topology: {} from {}", td.getName(), td.getTopologySubmitter());
+            String topologySubmitter = td.getTopologySubmitter();
+            if(topologySubmitter == null) {
+                LOG.warn("Topology {} submitted by anonymous user", td.getName());
+                topologySubmitter = "anonymous";
+            }
             if(!this.userMap.containsKey(topologySubmitter)) {
                 this.userMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
             }
-            if(cluster.getUnassignedExecutors(topo).size() >= topo.getExecutors().size()) {
-                this.userMap.get(topologySubmitter).addTopologyToPendingQueue(topo);
+            if(cluster.getUnassignedExecutors(td).size() >= td.getExecutors().size()) {
+                this.userMap.get(topologySubmitter).addTopologyToPendingQueue(td, cluster);
+                LOG.info(this.userMap.get(topologySubmitter).getDetailedInfo());
             } else {
-                this.userMap.get(topologySubmitter).addTopologyToRunningQueue(topo);
+                this.userMap.get(topologySubmitter).addTopologyToRunningQueue(td, cluster);
             }
         }
     }
@@ -219,6 +406,7 @@ public class ResourceAwareScheduler implements IScheduler {
         initUsers(topologies, cluster);
         this.cluster = cluster;
         this.topologies = topologies;
+        this.nodes = new RAS_Nodes(this.cluster, this.topologies);
     }
 
     /**
@@ -226,14 +414,13 @@ public class ResourceAwareScheduler implements IScheduler {
      * @return
      */
     private Map<String, Map<String, Double>> getUserResourcePools() {
-
-        Map<String, Map<String, Number>> raw =  (Map<String, Map<String, Number>>)this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+        Object raw = this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
         Map<String, Map<String, Double>> ret =  (Map<String, Map<String, Double>>)this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
 
         if (raw == null) {
-            ret = new  HashMap<String, Map<String, Double>>();
+            ret = new HashMap<String, Map<String, Double>>();
         } else {
-            for(Map.Entry<String, Map<String, Number>> UserPoolEntry : raw.entrySet()) {
+            for(Map.Entry<String, Map<String, Number>> UserPoolEntry : ((Map<String, Map<String, Number>>) raw).entrySet()) {
                 String user = UserPoolEntry.getKey();
                 ret.put(user, new HashMap<String, Double>());
                 for(Map.Entry<String, Number> resourceEntry : UserPoolEntry.getValue().entrySet()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
new file mode 100644
index 0000000..9e7b1ff
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SchedulingResult {
+
+    //contains the result for the attempted scheduling
+    private Map<WorkerSlot, Collection<ExecutorDetails>> schedulingResultMap = null;
+
+    private SchedulingStatus status = null;
+
+    private String message = null;
+
+    private String errorMessage = null;
+
+    private static final Logger LOG = LoggerFactory.getLogger(SchedulingResult.class);
+
+
+    public SchedulingResult(SchedulingStatus status, Map<WorkerSlot, Collection<ExecutorDetails>> schedulingResultMap, String message, String errorMessage) {
+        this.status = status;
+        this.schedulingResultMap = schedulingResultMap;
+        this.message = message;
+        this.errorMessage = errorMessage;
+    }
+
+    public static SchedulingResult failure(SchedulingStatus status, String errorMessage) {
+        return new SchedulingResult(status, null, null, errorMessage);
+    }
+
+    public static SchedulingResult success(Map<WorkerSlot, Collection<ExecutorDetails>> schedulingResultMap) {
+        return SchedulingResult.successWithMsg(schedulingResultMap, null);
+    }
+
+    public static SchedulingResult successWithMsg(Map<WorkerSlot, Collection<ExecutorDetails>> schedulingResultMap, String message) {
+        if (schedulingResultMap == null) {
+            throw new IllegalStateException("Cannot declare scheduling success without providing a non null scheduling map!");
+        }
+        return new SchedulingResult(SchedulingStatus.SUCCESS, schedulingResultMap, message, null);
+    }
+
+    public SchedulingStatus getStatus() {
+        return this.status;
+    }
+
+    public String getMessage() {
+        return this.message;
+    }
+
+    public String getErrorMessage() {
+        return this.errorMessage;
+    }
+
+    public Map<WorkerSlot, Collection<ExecutorDetails>> getSchedulingResultMap() {
+        return schedulingResultMap;
+    }
+
+    public boolean isSuccess() {
+        return SchedulingStatus.isStatusSuccess(this.status);
+    }
+
+    public boolean isFailure() {
+        return SchedulingStatus.isStatusFailure(this.status);
+    }
+
+    public boolean isValid() {
+        if (this.isSuccess() && this.getSchedulingResultMap() == null) {
+            LOG.warn("SchedulingResult not Valid! Status is success but SchedulingResultMap is null");
+            return false;
+        }
+        if (this.isFailure() && this.getSchedulingResultMap() != null) {
+            LOG.warn("SchedulingResult not Valid! Status is Failure but SchedulingResultMap is NOT null");
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        String ret = "";
+        if(this.isSuccess()) {
+            ret += "Status: " + this.getStatus() + " message: " + this.getMessage() + " scheduling: " + this.getSchedulingResultMap().toString();
+        } else {
+            ret += "Status: " + this.getStatus() + " error message: " + this.getErrorMessage();
+        }
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingStatus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingStatus.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingStatus.java
new file mode 100644
index 0000000..4622c29
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingStatus.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import java.util.EnumSet;
+
+public enum SchedulingStatus {
+    SUCCESS,
+    FAIL_NOT_ENOUGH_RESOURCES,
+    FAIL_INVALID_TOPOLOGY,
+    FAIL_OTHER;
+
+    public static EnumSet<SchedulingStatus> success = EnumSet.of(SUCCESS);
+    public static EnumSet<SchedulingStatus> failure = EnumSet.of(FAIL_INVALID_TOPOLOGY, FAIL_NOT_ENOUGH_RESOURCES, FAIL_OTHER);
+
+    public static boolean isStatusSuccess(SchedulingStatus status) {
+        return success.contains(status);
+    }
+
+    public static boolean isStatusFailure(SchedulingStatus status) {
+        return failure.contains(status);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
index 32af686..068db54 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
@@ -18,6 +18,7 @@
 
 package backtype.storm.scheduler.resource;
 
+import backtype.storm.scheduler.Cluster;
 import backtype.storm.scheduler.TopologyDetails;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +43,8 @@ public class User {
     //Topologies that was attempted to be scheduled but wasn't successull
     private Set<TopologyDetails> attemptedQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
 
+    private Set<TopologyDetails> invalidQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+
     private Map<String, Double> resourcePool = new HashMap<String, Double>();
 
     private static final Logger LOG = LoggerFactory.getLogger(User.class);
@@ -61,12 +64,26 @@ public class User {
         return this.userId;
     }
 
-    public void addTopologyToPendingQueue(TopologyDetails topo) {
+    public void addTopologyToPendingQueue(TopologyDetails topo, Cluster cluster) {
         this.pendingQueue.add(topo);
+        if (cluster != null) {
+            cluster.setStatus(topo.getId(), "Scheduling Pending");
+        }
     }
 
-    public void addTopologyToRunningQueue(TopologyDetails topo) {
+    public void addTopologyToPendingQueue(TopologyDetails topo) {
+        this.addTopologyToPendingQueue(topo, null);
+    }
+
+    public void addTopologyToRunningQueue(TopologyDetails topo, Cluster cluster) {
         this.runningQueue.add(topo);
+        if (cluster != null) {
+            cluster.setStatus(topo.getId(), "Fully Scheduled");
+        }
+    }
+
+    public void addTopologyToRunningQueue(TopologyDetails topo) {
+        this.addTopologyToRunningQueue(topo, null);
     }
 
     public Set<TopologyDetails> getTopologiesPending() {
@@ -94,23 +111,69 @@ public class User {
         return null;
     }
 
-    public void moveTopoFromPendingToRunning(TopologyDetails topo) {
+    public void moveTopoFromPendingToRunning(TopologyDetails topo, Cluster cluster) {
         moveTopology(topo, this.pendingQueue, "pending", this.runningQueue, "running");
+        if (cluster != null) {
+            cluster.setStatus(topo.getId(), "Fully Scheduled");
+        }
     }
 
-    public void moveTopoFromPendingToAttempted(TopologyDetails topo) {
+    public void moveTopoFromPendingToRunning(TopologyDetails topo) {
+        this.moveTopoFromPendingToRunning(topo, null);
+    }
+
+
+    public void moveTopoFromPendingToAttempted(TopologyDetails topo, Cluster cluster) {
         moveTopology(topo, this.pendingQueue, "pending", this.attemptedQueue, "attempted");
+        if (cluster != null) {
+            cluster.setStatus(topo.getId(), "Scheduling Attempted but Failed");
+        }
+    }
+
+    public void moveTopoFromPendingToAttempted(TopologyDetails topo) {
+        this.moveTopoFromPendingToAttempted(topo, null);
     }
 
-    private void moveTopology(TopologyDetails topo, Set<TopologyDetails> src, String srcName, Set<TopologyDetails> dest, String destName)  {
-        if(topo == null) {
+
+    public void moveTopoFromPendingToInvalid(TopologyDetails topo, Cluster cluster) {
+        moveTopology(topo, this.pendingQueue, "pending", this.invalidQueue, "invalid");
+        if (cluster != null) {
+            cluster.setStatus(topo.getId(), "Scheduling Attempted but topology is invalid");
+        }
+    }
+
+    public void moveTopoFromPendingToInvalid(TopologyDetails topo) {
+        this.moveTopoFromPendingToInvalid(topo, null);
+    }
+
+
+    public void moveTopoFromRunningToPending(TopologyDetails topo, Cluster cluster) {
+        moveTopology(topo, this.runningQueue, "running", this.pendingQueue, "pending");
+        if (cluster != null) {
+            cluster.setStatus(topo.getId(), "Scheduling Pending");
+        }
+    }
+
+    public void moveTopoFromRunningToPending(TopologyDetails topo) {
+        this.moveTopoFromRunningToPending(topo, null);
+    }
+
+
+    private void moveTopology(TopologyDetails topo, Set<TopologyDetails> src, String srcName, Set<TopologyDetails> dest, String destName) {
+        LOG.info("{} queue: {}", srcName, src);
+        LOG.info("{} queue: {}", destName, dest);
+        if (topo == null) {
             return;
         }
-        if(!src.contains(topo)) {
+        if (!src.contains(topo)) {
             LOG.warn("Topo {} not in User: {} {} queue!", topo.getName(), this.userId, srcName);
+            LOG.info("topo {}-{}-{}", topo.getName(), topo.getId(), topo.hashCode());
+            for (TopologyDetails t : src) {
+                LOG.info("queue entry: {}-{}-{}", t.getName(), t.getId(), t.hashCode());
+            }
             return;
         }
-        if(dest.contains(topo)) {
+        if (dest.contains(topo)) {
             LOG.warn("Topo {} already in in User: {} {} queue!", topo.getName(), this.userId, destName);
             return;
         }
@@ -124,8 +187,8 @@ public class User {
         Double cpuResourcePoolUtilization = this.getCPUResourcePoolUtilization();
         Double memoryResourcePoolUtilization = this.getMemoryResourcePoolUtilization();
 
-        if(cpuResourcePoolUtilization != null && memoryResourcePoolUtilization != null) {
-            return (cpuResourcePoolUtilization + memoryResourcePoolUtilization ) / 2.0;
+        if (cpuResourcePoolUtilization != null && memoryResourcePoolUtilization != null) {
+            return (cpuResourcePoolUtilization + memoryResourcePoolUtilization) / 2.0;
         }
         return Double.MAX_VALUE;
     }
@@ -173,7 +236,7 @@ public class User {
 
     public TopologyDetails getNextTopologyToSchedule() {
         for (TopologyDetails topo : this.pendingQueue) {
-            if(!this.attemptedQueue.contains(topo)) {
+            if (!this.attemptedQueue.contains(topo)) {
                 return topo;
             }
         }
@@ -184,6 +247,13 @@ public class User {
         return (!this.pendingQueue.isEmpty() && (this.pendingQueue.size() - this.attemptedQueue.size()) > 0);
     }
 
+    public TopologyDetails getRunningTopologyWithLowestPriority() {
+        if (this.runningQueue.isEmpty()) {
+            return null;
+        }
+        return this.runningQueue.iterator().next();
+    }
+
     @Override
     public int hashCode() {
         return this.userId.hashCode();
@@ -197,9 +267,10 @@ public class User {
     public String getDetailedInfo() {
         String ret = "\nUser: " + this.userId;
         ret += "\n - " + " Resource Pool: " + this.resourcePool;
-        ret += "\n - " + " Running Queue: " + this.runningQueue;
-        ret += "\n - " + " Pending Queue: " + this.pendingQueue;
-        ret += "\n - " + " Attempted Queue: " + this.attemptedQueue;
+        ret += "\n - " + " Running Queue: " + this.runningQueue + " size: " + this.runningQueue.size();
+        ret += "\n - " + " Pending Queue: " + this.pendingQueue + " size: " + this.pendingQueue.size();
+        ret += "\n - " + " Attempted Queue: " + this.attemptedQueue + " size: " + this.attemptedQueue.size();
+        ret += "\n - " + " Invalid Queue: " + this.invalidQueue + " size: " + this.invalidQueue.size();
         ret += "\n - " + " CPU Used: " + this.getCPUResourceUsedByUser() + " CPU guaranteed: " + this.getCPUResourceGuaranteed();
         ret += "\n - " + " Memory Used: " + this.getMemoryResourceUsedByUser() + " Memory guaranteed: " + this.getMemoryResourceGuaranteed();
         ret += "\n - " + " % Resource Guarantee Used: \n -- CPU: " + this.getCPUResourcePoolUtilization()
@@ -209,12 +280,31 @@ public class User {
 
     public static String getResourcePoolAverageUtilizationForUsers(Collection<User> users) {
         String ret = "";
-        for(User user : users) {
+        for (User user : users) {
             ret += user.getId() + " - " + user.getResourcePoolAverageUtilization() + " ";
         }
         return ret;
     }
 
+    public static int cTo(TopologyDetails topo1, TopologyDetails topo2) {
+        if (topo1.getId().compareTo(topo2.getId()) == 0) {
+            return 0;
+        }
+        if (topo1.getTopologyPriority() > topo2.getTopologyPriority()) {
+            return 1;
+        } else if (topo1.getTopologyPriority() < topo2.getTopologyPriority()) {
+            return -1;
+        } else {
+            if (topo1.getUpTime() > topo2.getUpTime()) {
+                return -1;
+            } else if (topo1.getUpTime() < topo2.getUpTime()) {
+                return 1;
+            } else {
+                return topo1.getId().compareTo(topo2.getId());
+            }
+        }
+    }
+
     /**
      * Comparator that sorts topologies by priority and then by submission time
      */
@@ -231,7 +321,7 @@ public class User {
                 } else if (topo1.getUpTime() < topo2.getUpTime()) {
                     return 1;
                 } else {
-                    return 0;
+                    return topo1.getId().compareTo(topo2.getId());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java
index 01f3223..722eddb 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/IStrategy.java
@@ -26,6 +26,7 @@ import backtype.storm.scheduler.ExecutorDetails;
 import backtype.storm.scheduler.TopologyDetails;
 import backtype.storm.scheduler.WorkerSlot;
 import backtype.storm.scheduler.resource.RAS_Node;
+import backtype.storm.scheduler.resource.SchedulingResult;
 
 /**
  * An interface to for implementing different scheduling strategies for the resource aware scheduling
@@ -33,5 +34,5 @@ import backtype.storm.scheduler.resource.RAS_Node;
  */
 public interface IStrategy {
 
-    public Map<WorkerSlot, Collection<ExecutorDetails>> schedule(TopologyDetails td);
+    public SchedulingResult schedule(TopologyDetails td);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
index 3fe37dd..812bf5d 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/ResourceAwareStrategy.java
@@ -30,6 +30,9 @@ import java.util.TreeMap;
 import java.util.HashSet;
 import java.util.Iterator;
 
+import backtype.storm.scheduler.resource.RAS_Nodes;
+import backtype.storm.scheduler.resource.SchedulingResult;
+import backtype.storm.scheduler.resource.SchedulingStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +64,7 @@ public class ResourceAwareStrategy implements IStrategy {
     public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
         _topologies = topologies;
         _cluster = cluster;
-        _nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
+        _nodes = RAS_Nodes.getAllNodesFrom(cluster, _topologies);
         _availNodes = this.getAvailNodes();
         _clusterInfo = cluster.getNetworkTopography();
         LOG.debug(this.getClusterInfo());
@@ -84,10 +87,10 @@ public class ResourceAwareStrategy implements IStrategy {
         return retMap;
     }
 
-    public Map<WorkerSlot, Collection<ExecutorDetails>> schedule(TopologyDetails td) {
+    public SchedulingResult schedule(TopologyDetails td) {
         if (_availNodes.size() <= 0) {
             LOG.warn("No available nodes to schedule tasks on!");
-            return null;
+            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
         }
         Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
         Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>();
@@ -97,7 +100,7 @@ public class ResourceAwareStrategy implements IStrategy {
 
         if (spouts.size() == 0) {
             LOG.error("Cannot find a Spout!");
-            return null;
+            return SchedulingResult.failure(SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
         }
 
         Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
@@ -159,18 +162,22 @@ public class ResourceAwareStrategy implements IStrategy {
                 LOG.error("Not Enough Resources to schedule Task {}", exec);
             }
         }
+
+        SchedulingResult result;
         executorsNotScheduled.removeAll(scheduledTasks);
         if (executorsNotScheduled.size() > 0) {
             LOG.error("Not all executors successfully scheduled: {}",
                     executorsNotScheduled);
             schedulerAssignmentMap = null;
+            result = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "Not all executors successfully scheduled: " + executorsNotScheduled);
         } else {
             LOG.debug("All resources successfully scheduled!");
+            result = SchedulingResult.success(schedulerAssignmentMap);
         }
         if (schedulerAssignmentMap == null) {
             LOG.error("Topology {} not successfully scheduled!", td.getId());
         }
-        return schedulerAssignmentMap;
+        return result;
     }
 
     private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {