You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:38 UTC

[10/23] storm git commit: first initial implementation

first initial implementation


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3e832205
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3e832205
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3e832205

Branch: refs/heads/master
Commit: 3e8322053c3d7fc7877b2985bae1ad657562c931
Parents: 4cd5efa
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Wed Nov 25 17:13:44 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Dec 4 13:07:11 2015 -0600

----------------------------------------------------------------------
 conf/user-resource-pools-example.yaml           |  26 +
 .../starter/ResourceAwareExampleTopology.java   |   6 +
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   5 +-
 storm-core/src/jvm/backtype/storm/Config.java   |   5 +-
 .../jvm/backtype/storm/scheduler/Cluster.java   |  41 +-
 .../storm/scheduler/SupervisorDetails.java      |   4 +-
 .../backtype/storm/scheduler/Topologies.java    |   8 +-
 .../storm/scheduler/TopologyDetails.java        |  47 +-
 .../storm/scheduler/resource/RAS_Node.java      |  32 +-
 .../storm/scheduler/resource/RAS_Nodes.java     |   7 +-
 .../resource/ResourceAwareScheduler.java        | 228 +++++----
 .../backtype/storm/scheduler/resource/User.java |  52 +-
 .../eviction/DefaultEvictionStrategy.java       |  46 +-
 .../DefaultSchedulingPriorityStrategy.java      |  14 +-
 .../priority/ISchedulingPriorityStrategy.java   |   2 +-
 .../DefaultResourceAwareStrategy.java           |   4 +-
 .../strategies/scheduling/IStrategy.java        |  11 +
 .../storm/validation/ConfigValidation.java      |  12 +-
 .../scheduler/resource_aware_scheduler_test.clj | 129 +++--
 .../storm/scheduler/resource/Experiment.java    | 222 ---------
 .../resource/TestResourceAwareScheduler.java    | 479 ++++++++++++++++++-
 .../storm/scheduler/resource/TestUser.java      |   4 +-
 .../TestUtilsForResourceAwareScheduler.java     |  42 +-
 23 files changed, 893 insertions(+), 533 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/conf/user-resource-pools-example.yaml
----------------------------------------------------------------------
diff --git a/conf/user-resource-pools-example.yaml b/conf/user-resource-pools-example.yaml
new file mode 100644
index 0000000..829a6be
--- /dev/null
+++ b/conf/user-resource-pools-example.yaml
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+resource.aware.scheduler.user.pools:
+    jerry:
+        cpu: 1000
+        memory: 8192.0
+    derek:
+        cpu: 10000.0
+        memory: 32768
+    bobby:
+        cpu: 5000.0
+        memory: 16384.0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
index e8225d4..a9ac659 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
@@ -84,6 +84,12 @@ public class ResourceAwareExampleTopology {
      */
     conf.setTopologyWorkerMaxHeapSize(1024.0);
 
+    // Set topology priority 0-30 with 0 being the highest priority and 30 being the lowest priority.
+    conf.setTopologyPriority(30);
+
+    //Set strategy to schedule topology. If not specified, default to backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
+    conf.setTopologyStrategy(backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);
+
     if (args != null && args.length > 0) {
       conf.setNumWorkers(3);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index e22161b..d76825d 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -847,13 +847,14 @@
 
         supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
         cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment conf)
-
+        _ (.setStatusMap cluster (deref (:id->sched-status nimbus)))
         ;; call scheduler.schedule to schedule all the topologies
         ;; the new assignments for all the topologies are in the cluster object.
         _ (.schedule (:scheduler nimbus) topologies cluster)
         _ (.setResourcesMap cluster @(:id->resources nimbus))
         _ (if-not (conf SCHEDULER-DISPLAY-RESOURCE) (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies))
-        _ (reset! (:id->sched-status nimbus) (.getStatusMap cluster))
+        ;;merge with existing statuses
+        _ (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status nimbus)) (.getStatusMap cluster)))
         _ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster))
         _ (reset! (:id->resources nimbus) (.getResourcesMap cluster))]
     (.getAssignments cluster)))

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 54af6fa..f3c8c4a 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -2228,11 +2228,10 @@ public class Config extends HashMap<String, Object> {
     }
 
     /**
-     * Takes as input the scheduler class name.
-     * Currently only the Multitenant Scheduler and Resource Aware Scheduler are supported
+     * Takes as input the strategy class name. Strategy must implement the IStrategy interface
      */
     public void setTopologyStrategy(Class<? extends IStrategy> clazz) {
-        if(clazz != null) {
+        if (clazz != null) {
             this.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, clazz.getName());
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index 2676af1..c35dbbd 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -28,6 +28,8 @@ import java.util.Set;
 import backtype.storm.Config;
 import backtype.storm.networktopography.DNSToSwitchMapping;
 import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class Cluster {
 
@@ -91,9 +93,18 @@ public class Cluster {
         this.conf = storm_conf;
     }
 
+    /**
+     * Get a copy of this cluster object
+     */
     public Cluster getCopy() {
-        Cluster copy = new Cluster(this.inimbus, this.supervisors, this.assignments, this.conf);
-        for(Map.Entry<String, String> entry : this.status.entrySet()) {
+        HashMap<String, SchedulerAssignmentImpl> newAssignments = new HashMap<String, SchedulerAssignmentImpl>();
+        for (Map.Entry<String, SchedulerAssignmentImpl> entry : this.assignments.entrySet()) {
+            newAssignments.put(entry.getKey(), new SchedulerAssignmentImpl(entry.getValue().getTopologyId(), entry.getValue().getExecutorToSlot()));
+        }
+        Map newConf = new HashMap<String, Object>();
+        newConf.putAll(this.conf);
+        Cluster copy = new Cluster(this.inimbus, this.supervisors, newAssignments, newConf);
+        for (Map.Entry<String, String> entry : this.status.entrySet()) {
             copy.setStatus(entry.getKey(), entry.getValue());
         }
         return copy;
@@ -447,9 +458,12 @@ public class Cluster {
         return ret;
     }
 
-    public void setAssignments(Map<String, SchedulerAssignment> assignments) {
+    /**
+     * set assignments for cluster
+     */
+    public void setAssignments(Map<String, SchedulerAssignment> newAssignments) {
         this.assignments = new HashMap<String, SchedulerAssignmentImpl>();
-        for(Map.Entry<String, SchedulerAssignmentImpl> entry : this.assignments.entrySet()) {
+        for (Map.Entry<String, SchedulerAssignment> entry : newAssignments.entrySet()) {
             this.assignments.put(entry.getKey(), new SchedulerAssignmentImpl(entry.getValue().getTopologyId(), entry.getValue().getExecutorToSlot()));
         }
     }
@@ -466,7 +480,7 @@ public class Cluster {
      */
     public double getClusterTotalCPUResource() {
         double sum = 0.0;
-        for(SupervisorDetails sup: this.supervisors.values()) {
+        for (SupervisorDetails sup : this.supervisors.values()) {
             sum += sup.getTotalCPU();
         }
         return sum;
@@ -477,7 +491,7 @@ public class Cluster {
      */
     public double getClusterTotalMemoryResource() {
         double sum = 0.0;
-        for(SupervisorDetails sup: this.supervisors.values()) {
+        for (SupervisorDetails sup : this.supervisors.values()) {
             sum += sup.getTotalMemory();
         }
         return sum;
@@ -607,14 +621,29 @@ public class Cluster {
         }
     }
 
+    /**
+     * set scheduler status for a topology
+     */
+    private static final Logger LOG = LoggerFactory
+            .getLogger(Cluster.class);
     public void setStatus(String topologyId, String status) {
         this.status.put(topologyId, status);
     }
 
+    /**
+     * Get all schedule statuses
+     */
     public Map<String, String> getStatusMap() {
         return this.status;
     }
 
+    /**
+     * set scheduler status map
+     */
+    public void setStatusMap(Map<String, String> statusMap) {
+        this.status.putAll(statusMap);
+    }
+
     public void setResources(String topologyId, Double[] resources) {
         this.resources.put(topologyId, resources);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
index d93252f..afdabe8 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java
@@ -120,7 +120,7 @@ public class SupervisorDetails {
 
     public Double getTotalMemory() {
         Double totalMemory = getTotalResource(Config.SUPERVISOR_MEMORY_CAPACITY_MB);
-        if(totalMemory == null) {
+        if (totalMemory == null) {
             throw new IllegalStateException("default value for supervisor.memory.capacity.mb is not set!");
         }
         return totalMemory;
@@ -128,7 +128,7 @@ public class SupervisorDetails {
 
     public Double getTotalCPU() {
         Double totalCPU = getTotalResource(Config.SUPERVISOR_CPU_CAPACITY);
-        if(totalCPU == null) {
+        if (totalCPU == null) {
             throw new IllegalStateException("default value for supervisor.cpu.capacity is not set!");
         }
         return totalCPU;

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
index 59a53c8..3a6361f 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
@@ -29,10 +29,10 @@ public class Topologies {
     Map<String, Map<String, Component>> _allComponents;
 
     public Topologies(Map<String, TopologyDetails> topologies) {
-        if(topologies==null) topologies = new HashMap();
-        this.topologies = new HashMap<String, TopologyDetails>(topologies.size());
+        if(topologies==null) topologies = new HashMap<>();
+        this.topologies = new HashMap<>(topologies.size());
         this.topologies.putAll(topologies);
-        this.nameToId = new HashMap<String, String>(topologies.size());
+        this.nameToId = new HashMap<>(topologies.size());
         
         for (Map.Entry<String, TopologyDetails> entry : topologies.entrySet()) {
             TopologyDetails topology = entry.getValue();
@@ -75,7 +75,7 @@ public class Topologies {
     @Override
     public String toString() {
         String ret = "Topologies:\n";
-        for(TopologyDetails td : this.getTopologies()) {
+        for (TopologyDetails td : this.getTopologies()) {
             ret += td.toString() + "\n";
         }
         return ret;

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
index 3931b43..871ae9b 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
@@ -39,18 +39,18 @@ import org.slf4j.LoggerFactory;
 
 
 public class TopologyDetails {
-    String topologyId;
-    Map topologyConf;
-    StormTopology topology;
-    Map<ExecutorDetails, String> executorToComponent;
-    int numWorkers;
+    private String topologyId;
+    private Map topologyConf;
+    private StormTopology topology;
+    private Map<ExecutorDetails, String> executorToComponent;
+    private int numWorkers;
     //<ExecutorDetails - Task, Map<String - Type of resource, Map<String - type of that resource, Double - amount>>>
     private Map<ExecutorDetails, Map<String, Double>> _resourceList;
     //Max heap size for a worker used by topology
     private Double topologyWorkerMaxHeapSize;
-
+    //topology priority
     private Integer topologyPriority;
-
+    //when topology was launched
     private int launchTime;
 
     private static final Logger LOG = LoggerFactory.getLogger(TopologyDetails.class);
@@ -415,17 +415,16 @@ public class TopologyDetails {
      * Add default resource requirements for a executor
      */
     public void addDefaultResforExec(ExecutorDetails exec) {
-
         Double topologyComponentCpuPcorePercent = Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
-        if(topologyComponentCpuPcorePercent == null) {
+        if (topologyComponentCpuPcorePercent == null) {
             LOG.warn("default value for topology.component.cpu.pcore.percent needs to be set!");
         }
-        Double topologyComponentResourcesOffheapMemoryMb =  Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
-        if(topologyComponentResourcesOffheapMemoryMb == null) {
+        Double topologyComponentResourcesOffheapMemoryMb = Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+        if (topologyComponentResourcesOffheapMemoryMb == null) {
             LOG.warn("default value for topology.component.resources.offheap.memory.mb needs to be set!");
         }
         Double topologyComponentResourcesOnheapMemoryMb = Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
-        if(topologyComponentResourcesOnheapMemoryMb == null) {
+        if (topologyComponentResourcesOnheapMemoryMb == null) {
             LOG.warn("default value for topology.component.resources.onheap.memory.mb needs to be set!");
         }
 
@@ -446,11 +445,11 @@ public class TopologyDetails {
      */
     private void initConfigs() {
         this.topologyWorkerMaxHeapSize = Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB), null);
-        if(this.topologyWorkerMaxHeapSize == null) {
+        if (this.topologyWorkerMaxHeapSize == null) {
             LOG.warn("default value for topology.worker.max.heap.size.mb needs to be set!");
         }
         this.topologyPriority = Utils.getInt(this.topologyConf.get(Config.TOPOLOGY_PRIORITY), null);
-        if(this.topologyPriority == null) {
+        if (this.topologyPriority == null) {
             LOG.warn("default value for topology.priority needs to be set!");
         }
     }
@@ -463,17 +462,35 @@ public class TopologyDetails {
         return this.topologyWorkerMaxHeapSize;
     }
 
+    /**
+     * Get the user that submitted this topology
+     */
     public String getTopologySubmitter() {
-       return (String)this.topologyConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        String user = (String) this.topologyConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        if (user == null || user.equals("")) {
+            LOG.debug("Topology {} submitted by anonymous user", this.getName());
+            user = "anonymous";
+        }
+        return user;
     }
 
+    /**
+     * get teh priority of this topology
+     */
     public int getTopologyPriority() {
        return this.topologyPriority;
     }
+
+    /**
+     * Get the timestamp of when this topology was launched
+     */
     public int getLaunchTime() {
         return this.launchTime;
     }
 
+    /**
+     * Get how long this topology has been executing
+     */
     public int getUpTime() {
         return Time.currentTimeSecs() - this.launchTime;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
index 21367f0..54775bf 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
@@ -90,7 +90,7 @@ public class RAS_Node {
 
     public Collection<WorkerSlot> getUsedSlots() {
         Collection<WorkerSlot> ret = new LinkedList<WorkerSlot>();
-        for(Collection<WorkerSlot> workers : _topIdToUsedSlots.values()) {
+        for (Collection<WorkerSlot> workers : _topIdToUsedSlots.values()) {
             ret.addAll(workers);
         }
         return ret;
@@ -243,30 +243,28 @@ public class RAS_Node {
 
     public void freeMemory(double amount) {
         _availMemory += amount;
-        LOG.info("freeing {} memory...avail mem: {}", amount, _availMemory);
-        if(_availMemory > this.getTotalMemoryResources()) {
+        LOG.debug("freeing {} memory on node {}...avail mem: {}", amount, this.getHostname(), _availMemory);
+        if (_availMemory > this.getTotalMemoryResources()) {
             LOG.warn("Freeing more memory than there exists!");
         }
     }
 
     public void freeCPU(double amount) {
         _availCPU += amount;
-        LOG.info("freeing {} CPU...avail CPU: {}", amount, _availCPU);
-        if(_availCPU > this.getAvailableCpuResources()) {
+        LOG.debug("freeing {} CPU on node...avail CPU: {}", amount, this.getHostname(), _availCPU);
+        if (_availCPU > this.getAvailableCpuResources()) {
             LOG.warn("Freeing more memory than there exists!");
         }
     }
 
     public double getMemoryUsedByWorker(WorkerSlot ws) {
         TopologyDetails topo = this.findTopologyUsingWorker(ws);
-        LOG.info("Topology {} using worker {}", topo, ws);
-        if(topo == null) {
+        if (topo == null) {
             return 0.0;
         }
         Collection<ExecutorDetails> execs = this.getExecutors(ws, this._cluster);
-        LOG.info("Worker {} has execs: {}", ws, execs);
         double totalMemoryUsed = 0.0;
-        for(ExecutorDetails exec : execs) {
+        for (ExecutorDetails exec : execs) {
             totalMemoryUsed += topo.getTotalMemReqTask(exec);
         }
         return totalMemoryUsed;
@@ -274,26 +272,23 @@ public class RAS_Node {
 
     public double getCpuUsedByWorker(WorkerSlot ws) {
         TopologyDetails topo = this.findTopologyUsingWorker(ws);
-        LOG.info("Topology {} using worker {}", topo, ws);
-        if(topo == null) {
+        if (topo == null) {
             return 0.0;
         }
         Collection<ExecutorDetails> execs = this.getExecutors(ws, this._cluster);
-        LOG.info("Worker {} has execs: {}", ws, execs);
         double totalCpuUsed = 0.0;
-        for(ExecutorDetails exec : execs) {
+        for (ExecutorDetails exec : execs) {
             totalCpuUsed += topo.getTotalCpuReqTask(exec);
         }
         return totalCpuUsed;
     }
 
     public TopologyDetails findTopologyUsingWorker(WorkerSlot ws) {
-        for(Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
-            LOG.info("topoId: {} workers: {}", entry.getKey(), entry.getValue());
+        for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
             String topoId = entry.getKey();
             Set<WorkerSlot> workers = entry.getValue();
             for (WorkerSlot worker : workers) {
-                if(worker.getNodeId().equals(ws.getNodeId()) && worker.getPort() == ws.getPort()) {
+                if (worker.getNodeId().equals(ws.getNodeId()) && worker.getPort() == ws.getPort()) {
                     return _topologies.getById(topoId);
                 }
             }
@@ -376,7 +371,9 @@ public class RAS_Node {
 
     @Override
     public String toString() {
-        return "{Node: " + _sup.getHost() + ", AvailMem: " + _availMemory.toString() + ", AvailCPU: " + _availCPU.toString() + "}";
+        return "{Node: " + ((_sup == null) ? "null (possibly down)" : _sup.getHost())
+                + ", AvailMem: " + ((_availMemory == null) ? "N/A" : _availMemory.toString())
+                + ", AvailCPU: " + ((_availCPU == null) ? "N/A" : _availCPU.toString()) + "}";
     }
 
     public static int countSlotsUsed(String topId, Collection<RAS_Node> nodes) {
@@ -415,7 +412,6 @@ public class RAS_Node {
         return total;
     }
 
-    //This function is only used for logging information
     public static Collection<ExecutorDetails> getExecutors(WorkerSlot ws, Cluster cluster) {
         Collection<ExecutorDetails> retList = new ArrayList<ExecutorDetails>();
         for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments()

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
index 42fc236..5a99bdd 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
@@ -35,16 +35,11 @@ import java.util.Map;
 public class RAS_Nodes {
 
     private Map<String, RAS_Node> nodeMap;
-    private Cluster cluster;
-    private Topologies topologies;
 
     private static final Logger LOG = LoggerFactory.getLogger(RAS_Nodes.class);
 
-
     public RAS_Nodes(Cluster cluster, Topologies topologies) {
         this.nodeMap = getAllNodesFrom(cluster, topologies);
-        this.cluster = cluster;
-        this.topologies = topologies;
     }
 
     public static Map<String, RAS_Node> getAllNodesFrom(Cluster cluster, Topologies topologies) {
@@ -142,7 +137,7 @@ public class RAS_Nodes {
         for (RAS_Node node : nodeMap.values()) {
             for (WorkerSlot ws : node.getUsedSlots()) {
                 if (workerSlots.contains(ws)) {
-                    LOG.info("freeing ws {} on node {}", ws, node);
+                    LOG.debug("freeing ws {} on node {}", ws, node);
                     node.free(ws);
                 }
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index f5c8354..53672f6 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -32,8 +32,8 @@ import backtype.storm.scheduler.IScheduler;
 import backtype.storm.scheduler.Topologies;
 import backtype.storm.scheduler.TopologyDetails;
 import backtype.storm.scheduler.WorkerSlot;
-import backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -55,7 +55,7 @@ public class ResourceAwareScheduler implements IScheduler {
         private Map conf = new Config();
 
         public SchedulingState(Map<String, User> userMap, Cluster cluster, Topologies topologies, RAS_Nodes nodes, Map conf) {
-            for(Map.Entry<String, User> userMapEntry : userMap.entrySet()) {
+            for (Map.Entry<String, User> userMapEntry : userMap.entrySet()) {
                 String userId = userMapEntry.getKey();
                 User user = userMapEntry.getValue();
                 this.userMap.put(userId, user.getCopy());
@@ -84,61 +84,51 @@ public class ResourceAwareScheduler implements IScheduler {
     @Override
     public void schedule(Topologies topologies, Cluster cluster) {
         LOG.info("\n\n\nRerunning ResourceAwareScheduler...");
-        LOG.debug(ResourceUtils.printScheduling(cluster, topologies));
-        LOG.info("topologies: {}", topologies);
-
+        //initialize data structures
         this.initialize(topologies, cluster);
-
-        LOG.info("UserMap:\n{}", this.userMap);
+        //logs everything that is currently scheduled and the location at which they are scheduled
+        LOG.info("Cluster scheduling:\n{}", ResourceUtils.printScheduling(cluster, topologies));
+        //logs the resources available/used for every node
+        LOG.info("Nodes:\n{}", this.nodes);
+        //logs the detailed info about each user
         for (User user : this.getUserMap().values()) {
             LOG.info(user.getDetailedInfo());
         }
 
-        for (TopologyDetails topo : topologies.getTopologies()) {
-            LOG.info("topo {} status: {}", topo, cluster.getStatusMap().get(topo.getId()));
-        }
-
-        LOG.info("Nodes:\n{}", this.nodes);
-
-        //LOG.info("getNextUser: {}", this.getNextUser());
-
         ISchedulingPriorityStrategy schedulingPrioritystrategy = null;
         while (true) {
-            LOG.info("/*********** next scheduling iteration **************/");
 
-            if(schedulingPrioritystrategy == null) {
+            if (schedulingPrioritystrategy == null) {
                 try {
                     schedulingPrioritystrategy = (ISchedulingPriorityStrategy) Utils.newInstance((String) this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
                 } catch (RuntimeException e) {
-                    LOG.error("failed to create instance of priority strategy: {} with error: {}! No topology eviction will be done.",
+                    LOG.error("failed to create instance of priority strategy: {} with error: {}! No topologies will be scheduled.",
                             this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY), e.getMessage());
                     break;
                 }
             }
-            //need to re prepare since scheduling state might have been restored
-            schedulingPrioritystrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
-            //Call scheduling priority strategy
-            TopologyDetails td = schedulingPrioritystrategy.getNextTopologyToSchedule();
-            if(td == null) {
+            TopologyDetails td = null;
+            try {
+                //need to re prepare since scheduling state might have been restored
+                schedulingPrioritystrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+                //Call scheduling priority strategy
+                td = schedulingPrioritystrategy.getNextTopologyToSchedule();
+            } catch (Exception e) {
+                LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled! Error: {} StackTrack: {}"
+                        , schedulingPrioritystrategy.getClass().getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
+                break;
+            }
+            if (td == null) {
                 break;
             }
             scheduleTopology(td);
         }
-
-        //since scheduling state might have been restored thus need to set the cluster and topologies.
-        cluster = this.cluster;
-        topologies = this.topologies;
     }
 
     public void scheduleTopology(TopologyDetails td) {
         User topologySubmitter = this.userMap.get(td.getTopologySubmitter());
         if (cluster.getUnassignedExecutors(td).size() > 0) {
-            LOG.info("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
-            LOG.info("{}", this.userMap.get(td.getTopologySubmitter()).getDetailedInfo());
-            LOG.info("{}", User.getResourcePoolAverageUtilizationForUsers(this.userMap.values()));
-            LOG.info("Nodes:\n{}", this.nodes);
-            LOG.debug("From cluster:\n{}", ResourceUtils.printScheduling(this.cluster, this.topologies));
-            LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
+            LOG.debug("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
 
             SchedulingState schedulingState = this.checkpointSchedulingState();
             IStrategy RAStrategy = null;
@@ -147,39 +137,59 @@ public class ResourceAwareScheduler implements IScheduler {
             } catch (RuntimeException e) {
                 LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.",
                         td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
-                topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
+                this.restoreCheckpointSchedulingState(schedulingState);
+                //since state is restored need the update User topologySubmitter to the new User object in userMap
+                topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+                topologySubmitter.moveTopoFromPendingToInvalid(td);
+                this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
+                        + td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for details");
                 return;
             }
             IEvictionStrategy evictionStrategy = null;
             while (true) {
-                //Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
-                RAStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
-                SchedulingResult result = RAStrategy.schedule(td);
-                LOG.info("scheduling result: {}", result);
-                if (result.isValid()) {
+                SchedulingResult result = null;
+                try {
+                    //Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
+                    RAStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+                    result = RAStrategy.schedule(td);
+                } catch (Exception e) {
+                    LOG.error("Exception thrown when running strategy {} to schedule topology {}. Topology will not be scheduled! Error: {} StackTrack: {}"
+                            , RAStrategy.getClass().getName(), td.getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
+                    this.restoreCheckpointSchedulingState(schedulingState);
+                    //since state is restored need the update User topologySubmitter to the new User object in userMap
+                    topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+                    topologySubmitter.moveTopoFromPendingToInvalid(td);
+                    this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
+                            + RAStrategy.getClass().getName() + ". Please check logs for details");
+                }
+                LOG.debug("scheduling result: {}", result);
+                if (result != null && result.isValid()) {
                     if (result.isSuccess()) {
                         try {
                             if (mkAssignment(td, result.getSchedulingResultMap())) {
-                                topologySubmitter.moveTopoFromPendingToRunning(td, this.cluster);
+                                topologySubmitter.moveTopoFromPendingToRunning(td);
+                                this.cluster.setStatus(td.getId(), "Running - " + result.getMessage());
                             } else {
                                 this.restoreCheckpointSchedulingState(schedulingState);
                                 //since state is restored need the update User topologySubmitter to the new User object in userMap
                                 topologySubmitter = this.userMap.get(td.getTopologySubmitter());
-                                topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
+                                topologySubmitter.moveTopoFromPendingToAttempted(td);
+                                this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Unable to assign executors to nodes. Please check logs for details");
                             }
                         } catch (IllegalStateException ex) {
                             LOG.error(ex.toString());
-                            LOG.error("Unsuccessful in scheduling: IllegalStateException thrown!", td.getId());
-                            this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
+                            LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Error: {} StackTrace: {}",
+                                    ex.getClass().getName(), Arrays.toString(ex.getStackTrace()));
                             this.restoreCheckpointSchedulingState(schedulingState);
                             //since state is restored need the update User topologySubmitter to the new User object in userMap
                             topologySubmitter = this.userMap.get(td.getTopologySubmitter());
-                            topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
+                            topologySubmitter.moveTopoFromPendingToAttempted(td);
+                            this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
                         }
                         break;
                     } else {
                         if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
-                            if(evictionStrategy == null) {
+                            if (evictionStrategy == null) {
                                 try {
                                     evictionStrategy = (IEvictionStrategy) Utils.newInstance((String) this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY));
                                 } catch (RuntimeException e) {
@@ -189,16 +199,29 @@ public class ResourceAwareScheduler implements IScheduler {
                                     break;
                                 }
                             }
-                            //need to re prepare since scheduling state might have been restored
-                            evictionStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
-                            if (!evictionStrategy.makeSpaceForTopo(td)) {
-                                this.cluster.setStatus(td.getId(), result.getErrorMessage());
+                            boolean madeSpace = false;
+                            try {
+                                //need to re prepare since scheduling state might have been restored
+                                evictionStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+                                madeSpace = evictionStrategy.makeSpaceForTopo(td);
+                            } catch (Exception e) {
+                                LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}. No evictions will be done! Error: {} StackTrack: {}"
+                                        , evictionStrategy.getClass().getName(), td.getName(), e.getClass().getName(), Arrays.toString(e.getStackTrace()));
                                 this.restoreCheckpointSchedulingState(schedulingState);
                                 //since state is restored need the update User topologySubmitter to the new User object in userMap
                                 topologySubmitter = this.userMap.get(td.getTopologySubmitter());
                                 topologySubmitter.moveTopoFromPendingToAttempted(td);
                                 break;
                             }
+                            if (!madeSpace) {
+                                LOG.debug("Could not make space for topo {} will move to attempted", td);
+                                this.restoreCheckpointSchedulingState(schedulingState);
+                                //since state is restored need the update User topologySubmitter to the new User object in userMap
+                                topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+                                topologySubmitter.moveTopoFromPendingToAttempted(td);
+                                this.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage());
+                                break;
+                            }
                             continue;
                         } else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
                             this.restoreCheckpointSchedulingState(schedulingState);
@@ -225,13 +248,14 @@ public class ResourceAwareScheduler implements IScheduler {
             }
         } else {
             LOG.warn("Topology {} is already fully scheduled!", td.getName());
-            topologySubmitter.moveTopoFromPendingToRunning(td, this.cluster);
-            throw new IllegalStateException("illegal");
+            topologySubmitter.moveTopoFromPendingToRunning(td);
+            if (this.cluster.getStatusMap().get(td.getId()) == null || this.cluster.getStatusMap().get(td.getId()).equals("")) {
+                this.cluster.setStatus(td.getId(), "Fully Scheduled");
+            }
         }
     }
 
     private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) {
-        LOG.info("making assignments for topology {}", td);
         if (schedulerAssignmentMap != null) {
             double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
             double requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
@@ -241,7 +265,6 @@ public class ResourceAwareScheduler implements IScheduler {
             double assignedCpu = 0.0;
 
             Set<String> nodesUsed = new HashSet<String>();
-            int assignedWorkers = schedulerAssignmentMap.keySet().size();
             for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
                 WorkerSlot targetSlot = workerToTasksEntry.getKey();
                 Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
@@ -256,8 +279,6 @@ public class ResourceAwareScheduler implements IScheduler {
                 assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap();
                 assignedCpu += targetSlot.getAllocatedCpu();
             }
-            LOG.debug("Topology: {} assigned to {} nodes on {} workers", td.getId(), nodesUsed.size(), assignedWorkers);
-            this.cluster.setStatus(td.getId(), "Fully Scheduled");
 
             Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
                     assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
@@ -266,17 +287,17 @@ public class ResourceAwareScheduler implements IScheduler {
                     td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
                     assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
             this.cluster.setResources(td.getId(), resources);
+            updateSupervisorsResources(this.cluster, this.topologies);
             return true;
         } else {
             LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", td.getName());
             return false;
         }
-        updateSupervisorsResources(cluster, topologies);
     }
 
     private void updateSupervisorsResources(Cluster cluster, Topologies topologies) {
         Map<String, Double[]> supervisors_resources = new HashMap<String, Double[]>();
-        Map<String, RAS_Node> nodes = RAS_Node.getAllNodesFrom(cluster, topologies);
+        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
         for (Map.Entry<String, RAS_Node> entry : nodes.entrySet()) {
             RAS_Node node = entry.getValue();
             Double totalMem = node.getTotalMemoryResources();
@@ -297,42 +318,6 @@ public class ResourceAwareScheduler implements IScheduler {
         return this.userMap;
     }
 
-//    public User getNextUser() {
-//        Double least = Double.POSITIVE_INFINITY;
-//        User ret = null;
-//        for (User user : this.userMap.values()) {
-//            LOG.info("getNextUser {}", user.getDetailedInfo());
-//            LOG.info("hasTopologyNeedSchedule: {}", user.hasTopologyNeedSchedule());
-//            if (user.hasTopologyNeedSchedule()) {
-//                Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
-//                if(ret!=null) {
-//                    LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), least, user.getId(), userResourcePoolAverageUtilization);
-//                    LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, least == userResourcePoolAverageUtilization);
-//                    LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001));
-//
-//                }
-//                if (least > userResourcePoolAverageUtilization) {
-//                    ret = user;
-//                    least = userResourcePoolAverageUtilization;
-//                } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
-//                    double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
-//                    double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
-//                    double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
-//
-//                    double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
-//                    double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
-//                    double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
-//                    LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), currentAvgPercentage, user.getId(), userAvgPercentage);
-//                    if (userAvgPercentage > currentAvgPercentage) {
-//                        ret = user;
-//                        least = userResourcePoolAverageUtilization;
-//                    }
-//                }
-//            }
-//        }
-//        return ret;
-//    }
-
     /**
      * Intialize scheduling and running queues
      *
@@ -340,33 +325,40 @@ public class ResourceAwareScheduler implements IScheduler {
      * @param cluster
      */
     private void initUsers(Topologies topologies, Cluster cluster) {
-
         this.userMap = new HashMap<String, User>();
         Map<String, Map<String, Double>> userResourcePools = this.getUserResourcePools();
-        LOG.info("userResourcePools: {}", userResourcePools);
+        LOG.debug("userResourcePools: {}", userResourcePools);
 
         for (TopologyDetails td : topologies.getTopologies()) {
+            //Get user that submitted topology.  If topology submitter is null or empty string, the topologySubmitter
+            //will be set to anonymous
             String topologySubmitter = td.getTopologySubmitter();
-            if (topologySubmitter == null) {
-                LOG.warn("Topology {} submitted by anonymous user", td.getName());
-                topologySubmitter = "anonymous";
+            //additional safety check to make sure that topologySubmitter is going to be a valid value
+            if (topologySubmitter == null || topologySubmitter.equals("")) {
+                LOG.error("Cannot determine user for topology {}.  Will skip scheduling this topology", td.getName());
+                continue;
             }
             if (!this.userMap.containsKey(topologySubmitter)) {
                 this.userMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
             }
-            if (cluster.getUnassignedExecutors(td).size() >= td.getExecutors().size()) {
-                this.userMap.get(topologySubmitter).addTopologyToPendingQueue(td, cluster);
+            if (cluster.getUnassignedExecutors(td).size() > 0) {
+                LOG.debug("adding td: {} to pending queue", td.getName());
+                this.userMap.get(topologySubmitter).addTopologyToPendingQueue(td);
             } else {
-                this.userMap.get(topologySubmitter).addTopologyToRunningQueue(td, cluster);
+                LOG.debug("adding td: {} to running queue with existing status: {}", td.getName(), cluster.getStatusMap().get(td.getId()));
+                this.userMap.get(topologySubmitter).addTopologyToRunningQueue(td);
+                if (cluster.getStatusMap().get(td.getId()) == null || cluster.getStatusMap().get(td.getId()).equals("")) {
+                    cluster.setStatus(td.getId(), "Fully Scheduled");
+                }
             }
         }
     }
 
     private void initialize(Topologies topologies, Cluster cluster) {
-        initUsers(topologies, cluster);
         this.cluster = cluster;
         this.topologies = topologies;
         this.nodes = new RAS_Nodes(this.cluster, this.topologies);
+        initUsers(topologies, cluster);
     }
 
     /**
@@ -403,30 +395,36 @@ public class ResourceAwareScheduler implements IScheduler {
     }
 
     private SchedulingState checkpointSchedulingState() {
-        LOG.info("checkpointing scheduling state...");
-        LOG.info("/*********Checkpoint************/");
+        LOG.debug("/*********Checkpoint scheduling state************/");
         for (User user : this.getUserMap().values()) {
-            LOG.info(user.getDetailedInfo());
+            LOG.debug(user.getDetailedInfo());
         }
-        LOG.info("/*********End************/");
+        LOG.debug(ResourceUtils.printScheduling(this.cluster, this.topologies));
+        LOG.debug("nodes:\n{}", this.nodes);
+        LOG.debug("/*********End************/");
         return new SchedulingState(this.userMap, this.cluster, this.topologies, this.nodes, this.conf);
     }
 
     private void restoreCheckpointSchedulingState(SchedulingState schedulingState) {
-        LOG.info("restoring scheduling state...");
-        LOG.info("/*********Before************/");
-        for (User user : this.getUserMap().values()) {
-            LOG.info(user.getDetailedInfo());
-        }
-        this.cluster = schedulingState.cluster;
+        LOG.debug("/*********restoring scheduling state************/");
+        //reseting cluster
+        //Cannot simply set this.cluster=schedulingState.cluster since clojure is immutable
+        this.cluster.setAssignments(schedulingState.cluster.getAssignments());
+        this.cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap());
+        this.cluster.setStatusMap(schedulingState.cluster.getStatusMap());
+        this.cluster.setResourcesMap(schedulingState.cluster.getResourcesMap());
+        //don't need to explicitly set data structues like Cluster since nothing can really be changed
+        //unless this.topologies is set to another object
         this.topologies = schedulingState.topologies;
         this.conf = schedulingState.conf;
         this.userMap = schedulingState.userMap;
         this.nodes = schedulingState.nodes;
-        LOG.info("/*********After************/");
+
         for (User user : this.getUserMap().values()) {
-            LOG.info(user.getDetailedInfo());
+            LOG.debug(user.getDetailedInfo());
         }
-        LOG.info("/*********End************/");
+        LOG.debug(ResourceUtils.printScheduling(cluster, topologies));
+        LOG.debug("nodes:\n{}", this.nodes);
+        LOG.debug("/*********End************/");
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
index 2d7c79f..8542120 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
@@ -58,20 +58,26 @@ public class User {
         if (resourcePool != null) {
             this.resourcePool.putAll(resourcePool);
         }
+        if (this.resourcePool.get("cpu") == null) {
+            this.resourcePool.put("cpu", 0.0);
+        }
+        if (this.resourcePool.get("memory") == null) {
+            this.resourcePool.put("memory", 0.0);
+        }
     }
 
     public User getCopy() {
         User newUser = new User(this.userId, this.resourcePool);
-        for(TopologyDetails topo :  this.pendingQueue) {
+        for (TopologyDetails topo : this.pendingQueue) {
             newUser.addTopologyToPendingQueue(topo);
         }
-        for(TopologyDetails topo :  this.runningQueue) {
+        for (TopologyDetails topo : this.runningQueue) {
             newUser.addTopologyToRunningQueue(topo);
         }
-        for(TopologyDetails topo :  this.attemptedQueue) {
+        for (TopologyDetails topo : this.attemptedQueue) {
             newUser.addTopologyToAttemptedQueue(topo);
         }
-        for(TopologyDetails topo :  this.invalidQueue) {
+        for (TopologyDetails topo : this.invalidQueue) {
             newUser.addTopologyToInvalidQueue(topo);
         }
         return newUser;
@@ -134,6 +140,7 @@ public class User {
         ret.addAll(this.invalidQueue);
         return ret;
     }
+
     public Map<String, Number> getResourcePool() {
         if (this.resourcePool != null) {
             return new HashMap<String, Number>(this.resourcePool);
@@ -190,7 +197,7 @@ public class User {
 
 
     private void moveTopology(TopologyDetails topo, Set<TopologyDetails> src, String srcName, Set<TopologyDetails> dest, String destName) {
-        LOG.info("For User {} Moving topo {} from {} to {}", this.userId, topo.getName(), srcName, destName);
+        LOG.debug("For User {} Moving topo {} from {} to {}", this.userId, topo.getName(), srcName, destName);
         if (topo == null) {
             return;
         }
@@ -204,49 +211,49 @@ public class User {
         }
         src.remove(topo);
         dest.add(topo);
-        LOG.info("SRC: {}", src);
-        LOG.info("DEST: {}", dest);
     }
 
 
-    public Double getResourcePoolAverageUtilization() {
-        List<Double> resourceUilitzationList = new LinkedList<Double>();
+    public double getResourcePoolAverageUtilization() {
         Double cpuResourcePoolUtilization = this.getCPUResourcePoolUtilization();
         Double memoryResourcePoolUtilization = this.getMemoryResourcePoolUtilization();
 
         if (cpuResourcePoolUtilization != null && memoryResourcePoolUtilization != null) {
-            return (cpuResourcePoolUtilization + memoryResourcePoolUtilization) / 2.0;
+            //cannot be (cpuResourcePoolUtilization + memoryResourcePoolUtilization)/2
+            //since memoryResourcePoolUtilization or cpuResourcePoolUtilization can be Double.MAX_VALUE
+            //Should not return infinity in that case
+            return ((cpuResourcePoolUtilization) / 2.0) + ((memoryResourcePoolUtilization) / 2.0);
         }
         return Double.MAX_VALUE;
     }
 
-    public Double getCPUResourcePoolUtilization() {
+    public double getCPUResourcePoolUtilization() {
         Double cpuGuarantee = this.resourcePool.get("cpu");
-        if (cpuGuarantee != null) {
-            return this.getCPUResourceUsedByUser() / cpuGuarantee;
+        if (cpuGuarantee == null || cpuGuarantee == 0.0) {
+            return Double.MAX_VALUE;
         }
-        return null;
+        return this.getCPUResourceUsedByUser() / cpuGuarantee;
     }
 
-    public Double getMemoryResourcePoolUtilization() {
+    public double getMemoryResourcePoolUtilization() {
         Double memoryGuarantee = this.resourcePool.get("memory");
-        if (memoryGuarantee != null) {
-            return this.getMemoryResourceUsedByUser() / memoryGuarantee;
+        if (memoryGuarantee == null || memoryGuarantee == 0.0) {
+            return Double.MAX_VALUE;
         }
-        return null;
+        return this.getMemoryResourceUsedByUser() / memoryGuarantee;
     }
 
 
-    public Double getCPUResourceUsedByUser() {
-        Double sum = 0.0;
+    public double getCPUResourceUsedByUser() {
+        double sum = 0.0;
         for (TopologyDetails topo : this.runningQueue) {
             sum += topo.getTotalRequestedCpu();
         }
         return sum;
     }
 
-    public Double getMemoryResourceUsedByUser() {
-        Double sum = 0.0;
+    public double getMemoryResourceUsedByUser() {
+        double sum = 0.0;
         for (TopologyDetails topo : this.runningQueue) {
             sum += topo.getTotalRequestedMemOnHeap() + topo.getTotalRequestedMemOffHeap();
         }
@@ -271,7 +278,6 @@ public class User {
     }
 
     public boolean hasTopologyNeedSchedule() {
-        //return (!this.pendingQueue.isEmpty() && (this.pendingQueue.size() - this.attemptedQueue.size()) > 0);
         return (!this.pendingQueue.isEmpty());
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
index 7ca7ac3..f0401ce 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -50,54 +50,50 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
 
     @Override
     public boolean makeSpaceForTopo(TopologyDetails td) {
-        LOG.info("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
+        LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
         User submitter = this.userMap.get(td.getTopologySubmitter());
         if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null) {
             return false;
         }
-
         double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCPUResourceGuaranteed();
         double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
 
+        User evictUser = this.findUserWithMostResourcesAboveGuarantee();
         //user has enough resource under his or her resource guarantee to schedule topology
         if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
-            User evictUser = this.findUserWithMostResourcesAboveGuarantee();
-            if (evictUser == null) {
-                LOG.info("Cannot make space for topology {} from user {}", td.getName(), submitter.getId());
-                submitter.moveTopoFromPendingToAttempted(td, this.cluster);
+            if (evictUser != null) {
 
-                return false;
+                TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
+                evictTopology(topologyEvict);
+                return true;
             }
-            TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
-            LOG.info("topology to evict: {}", topologyEvict);
-            evictTopology(topologyEvict);
-
-            LOG.info("Resources After eviction:\n{}", this.nodes);
-
-            return true;
         } else {
-
-            if ((1.0 - submitter.getCPUResourcePoolUtilization()) < cpuNeeded) {
-
+            if (evictUser != null) {
+                if ((evictUser.getResourcePoolAverageUtilization() - 1.0) > (cpuNeeded + (submitter.getResourcePoolAverageUtilization() - 1.0))) {
+                    TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
+                    evictTopology(topologyEvict);
+                    return true;
+                }
             }
-
-            if ((1.0 - submitter.getMemoryResourcePoolUtilization()) < memoryNeeded) {
-
+        }
+        //See if there is a lower priority topology that can be evicted from the current user
+        for (TopologyDetails topo : submitter.getTopologiesRunning()) {
+            //check to if there is a topology with a lower priority we can evict
+            if (topo.getTopologyPriority() > td.getTopologyPriority()) {
+                evictTopology(topo);
+                return true;
             }
-            return false;
-
         }
+        return false;
     }
 
     private void evictTopology(TopologyDetails topologyEvict) {
         Collection<WorkerSlot> workersToEvict = this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId());
         User submitter = this.userMap.get(topologyEvict.getTopologySubmitter());
 
-        LOG.info("Evicting Topology {} with workers: {}", topologyEvict.getName(), workersToEvict);
-        LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
+        LOG.info("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict, topologyEvict.getTopologySubmitter());
         this.nodes.freeSlots(workersToEvict);
         submitter.moveTopoFromRunningToPending(topologyEvict, this.cluster);
-        LOG.info("check if topology unassigned: {}", this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId()));
     }
 
     private User findUserWithMostResourcesAboveGuarantee() {

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
index 5096fd6..990ccd6 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
-public class DefaultSchedulingPriorityStrategy implements  ISchedulingPriorityStrategy{
+public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStrategy {
     private static final Logger LOG = LoggerFactory
             .getLogger(DefaultSchedulingPriorityStrategy.class);
 
@@ -58,20 +58,15 @@ public class DefaultSchedulingPriorityStrategy implements  ISchedulingPrioritySt
         Double least = Double.POSITIVE_INFINITY;
         User ret = null;
         for (User user : this.userMap.values()) {
-            LOG.info("getNextUser {}", user.getDetailedInfo());
-            LOG.info("hasTopologyNeedSchedule: {}", user.hasTopologyNeedSchedule());
             if (user.hasTopologyNeedSchedule()) {
                 Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
-                if(ret!=null) {
-                    LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), least, user.getId(), userResourcePoolAverageUtilization);
-                    LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, least == userResourcePoolAverageUtilization);
-                    LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001));
 
-                }
                 if (least > userResourcePoolAverageUtilization) {
                     ret = user;
                     least = userResourcePoolAverageUtilization;
-                } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
+                }
+                // if ResourcePoolAverageUtilization is equal to the user that is being compared
+                else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
                     double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
                     double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
                     double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
@@ -79,7 +74,6 @@ public class DefaultSchedulingPriorityStrategy implements  ISchedulingPrioritySt
                     double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
                     double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
                     double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
-                    LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), currentAvgPercentage, user.getId(), userAvgPercentage);
                     if (userAvgPercentage > currentAvgPercentage) {
                         ret = user;
                         least = userResourcePoolAverageUtilization;

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
index 7e92b3d..a5b0ff5 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
@@ -34,7 +34,7 @@ public interface ISchedulingPriorityStrategy {
     public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
 
     /**
-     *
+     * Gets the next topology to schedule
      * @return return the next topology to schedule.  If there is no topologies left to schedule, return null
      */
     public TopologyDetails getNextTopologyToSchedule();

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 1950858..75cc5eb 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -170,10 +170,10 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             LOG.error("Not all executors successfully scheduled: {}",
                     executorsNotScheduled);
             schedulerAssignmentMap = null;
-            result = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "Not all executors successfully scheduled: " + executorsNotScheduled);
+            result = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, (td.getExecutors().size() - unassignedExecutors.size()) + "/" + td.getExecutors().size() + " executors scheduled");
         } else {
             LOG.debug("All resources successfully scheduled!");
-            result = SchedulingResult.success(schedulerAssignmentMap);
+            result = SchedulingResult.successWithMsg(schedulerAssignmentMap, "Fully Scheduled by DefaultResourceAwareStrategy");
         }
         if (schedulerAssignmentMap == null) {
             LOG.error("Topology {} not successfully scheduled!", td.getId());

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
index 12e8ff3..bb2e955 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -37,7 +37,18 @@ import backtype.storm.scheduler.resource.User;
  */
 public interface IStrategy {
 
+    /**
+     * initialize prior to scheduling
+     */
     public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
 
+    /**
+     * This method is invoked to calcuate a scheduling for topology td
+     * @param td
+     * @return returns a SchedulingResult object containing SchedulingStatus object to indicate whether scheduling is successful
+     * The strategy must calculate a scheduling in the format of Map<WorkerSlot, Collection<ExecutorDetails>> where the key of
+     * this map is the worker slot that the value (collection of executors) should be assigned to. if a scheduling is calculated successfully,
+     * put the scheduling map in the SchedulingResult object.
+     */
     public SchedulingResult schedule(TopologyDetails td);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index b82a4ec..57c9f40 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -507,14 +507,14 @@ public class ConfigValidation {
 
         @Override
         public void validateField(String name, Object o) {
-            if(o == null) {
+            if (o == null) {
                 return;
             }
             SimpleTypeValidator.validateField(name, Map.class, o);
-            if(!((Map) o).containsKey("cpu") ) {
-                throw new IllegalArgumentException( "Field " + name + " must have map entry with key: cpu");
+            if (!((Map) o).containsKey("cpu")) {
+                throw new IllegalArgumentException("Field " + name + " must have map entry with key: cpu");
             }
-            if(!((Map) o).containsKey("memory") ) {
+            if (!((Map) o).containsKey("memory")) {
                 throw new IllegalArgumentException("Field " + name + " must have map entry with key: memory");
             }
 
@@ -533,13 +533,13 @@ public class ConfigValidation {
 
         @Override
         public void validateField(String name, Object o) {
-            if(o == null) {
+            if (o == null) {
                 return;
             }
             SimpleTypeValidator.validateField(name, String.class, o);
             try {
                 Class objectClass = Class.forName((String) o);
-                if(!this.classImplements.isAssignableFrom(objectClass)) {
+                if (!this.classImplements.isAssignableFrom(objectClass)) {
                     throw new IllegalArgumentException("Field " + name + " with value " + o + " does not implement " + this.classImplements.getName());
                 }
             } catch (ClassNotFoundException e) {