You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:43 UTC

[15/23] storm git commit: making edits based on review comments

making edits based on review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a7f69135
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a7f69135
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a7f69135

Branch: refs/heads/master
Commit: a7f691350b435068f042ba1e9b83d1c317a4eaf7
Parents: 9a542a6
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Tue Dec 15 16:46:18 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Tue Dec 15 23:53:28 2015 -0600

----------------------------------------------------------------------
 conf/defaults.yaml                              |   2 +
 .../Resource_Aware_Scheduler_overview.md        |  17 +-
 .../starter/ResourceAwareExampleTopology.java   |   5 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  15 +-
 storm-core/src/jvm/backtype/storm/Config.java   |   2 +-
 .../jvm/backtype/storm/scheduler/Cluster.java   |  17 +-
 .../backtype/storm/scheduler/Topologies.java    |   4 +-
 .../storm/scheduler/TopologyDetails.java        |  76 +++----
 .../storm/scheduler/resource/RAS_Node.java      | 215 +++++++++----------
 .../storm/scheduler/resource/RAS_Nodes.java     |   5 +-
 .../resource/ResourceAwareScheduler.java        |  14 +-
 .../eviction/DefaultEvictionStrategy.java       |   1 -
 .../DefaultSchedulingPriorityStrategy.java      |   1 -
 .../DefaultResourceAwareStrategy.java           |   3 +-
 .../strategies/scheduling/IStrategy.java        |   4 +-
 .../storm/validation/ConfigValidation.java      |   3 +-
 .../resource/TestResourceAwareScheduler.java    |  10 -
 .../TestUtilsForResourceAwareScheduler.java     |   3 -
 18 files changed, 191 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 448db46..1297e3e 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -251,6 +251,8 @@ topology.disruptor.batch.timeout.millis: 1
 topology.disable.loadaware: false
 
 # Configs for Resource Aware Scheduler
+# topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases).
+# Recommended range of 0-29 but no hard limit set.
 topology.priority: 29
 topology.component.resources.onheap.memory.mb: 128.0
 topology.component.resources.offheap.memory.mb: 0.0

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/docs/documentation/Resource_Aware_Scheduler_overview.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Resource_Aware_Scheduler_overview.md b/docs/documentation/Resource_Aware_Scheduler_overview.md
index f0ffd44..0f5b8cb 100644
--- a/docs/documentation/Resource_Aware_Scheduler_overview.md
+++ b/docs/documentation/Resource_Aware_Scheduler_overview.md
@@ -61,6 +61,8 @@ Example of Usage:
     s1.setCPULoad(15.0);
     builder.setBolt("exclaim1", new ExclamationBolt(), 3)
                 .shuffleGrouping("word").setCPULoad(10.0);
+    builder.setBolt("exclaim2", new HeavyBolt(), 1)
+                    .shuffleGrouping("exclaim1").setCPULoad(450.0);
 
 ###	Limiting the Heap Size per Worker (JVM) Process
 
@@ -70,7 +72,7 @@ Example of Usage:
 Parameters:
 * Number size – The memory limit a worker process will be allocated in megabytes
 
-The user can limit the amount of memory resources the resource aware scheduler that is allocated to a single worker on a per topology basis by using the above API.  This API is in place so that the users can spread executors to multiple workers.  However, spreading workers to multiple workers may increase the communication latency since executors will not be able to use Disruptor Queue for intra-process communication.
+The user can limit the amount of memory resources the resource aware scheduler allocates to a single worker on a per topology basis by using the above API.  This API is in place so that the users can spread executors to multiple workers.  However, spreading executors to multiple workers may increase the communication latency since executors will not be able to use Disruptor Queue for intra-process communication.
 
 Example of Usage:
 
@@ -116,7 +118,7 @@ The user can set some default configurations for the Resource Aware Scheduler in
 
 # Topology Priorities and Per User Resource 
 
-The next step for the Resource Aware Scheduler or RAS is to enable it to have multitenant capabilities since many Storm users typically share a Storm cluster.  Resource Aware Scheduler needs to be able to allocate resources on a per user basis.  Each user can be guaranteed a certain amount of resources to run his or her topologies and the Resource Aware Scheduler should meet those guarantees when possible.  When the Storm cluster has extra free resources, Resource Aware Scheduler needs to be able allocate additional resources to user in a fair manner. The importance of topologies can also vary.  Topologies can be used for actual production or just experimentation, thus Resource Aware Scheduler should take into account the importance of a topology when determining the order in which to schedule topologies or when to evict topologies
+The Resource Aware Scheduler or RAS also has multitenant capabilities since many Storm users typically share a Storm cluster.  Resource Aware Scheduler can allocate resources on a per user basis.  Each user can be guaranteed a certain amount of resources to run his or her topologies and the Resource Aware Scheduler will meet those guarantees when possible.  When the Storm cluster has extra free resources, Resource Aware Scheduler will to be able allocate additional resources to user in a fair manner. The importance of topologies can also vary.  Topologies can be used for actual production or just experimentation, thus Resource Aware Scheduler will take into account the importance of a topology when determining the order in which to schedule topologies or when to evict topologies
 
 ## Setup
 
@@ -158,9 +160,11 @@ Thus, each priority level contains 10 sub priorities. Users can set the priority
 Parameters:
 * priority – an integer representing the priority of the topology
 
+Please note that the 0-29 range is not a hard limit.  Thus, a user can set a priority number that is higher than 29. However, the property of higher the priority number, lower the importance still holds
+
 ### Specifying Scheduling Strategy:
 
-A user can specify on a per topology basis what scheduling strategy to use.  Users can implement the IStrategy interface and define a new strategies to schedule specific topologies.  This pluggable interface was created since we realize different topologies might have different scheduling needs.  A user can set the topology strategy within the topology definition by using the API:
+A user can specify on a per topology basis what scheduling strategy to use.  Users can implement the IStrategy interface and define new strategies to schedule specific topologies.  This pluggable interface was created since we realize different topologies might have different scheduling needs.  A user can set the topology strategy within the topology definition by using the API:
 
     public void setTopologyStrategy(Class<? extends IStrategy> clazz)
     
@@ -185,7 +189,7 @@ A default strategy will be provided.  The following explains how the default sch
 
 **DefaultSchedulingPriorityStrategy**
 
-The order of scheduling should be based on the distance a user’s current resource allocation to his or her guaranteed allocation.  We should prioritize the users who are the furthest away from their resource guarantee. The difficulty of this problem is that a user may have multiple resource guarantees, and another user can have another set resource guarantees, so how can we compare them in a fair manner?  Let's use the average percentage of resource guarantees satisfied as a method of comparison.
+The order of scheduling should be based on the distance between a user’s current resource allocation and his or her guaranteed allocation.  We should prioritize the users who are the furthest away from their resource guarantee. The difficulty of this problem is that a user may have multiple resource guarantees, and another user can have another set of resource guarantees, so how can we compare them in a fair manner?  Let's use the average percentage of resource guarantees satisfied as a method of comparison.
 
 For example:
 
@@ -204,10 +208,10 @@ User B’s average percentage satisfied of resource guarantee:
 
 Thus, in this example User A has a smaller average percentage of his or her resource guarantee satisfied than User B.  Thus, User A should get priority to be allocated more resource, i.e., schedule a topology submitted by User A.
 
-When scheduling, RAS sort users by the average percentage satisfied of resource guarantee and schedule topologies from users based on that ordering starting from the users with the lowest average percentage satisfied of resource guarantee.  When a user’s resource guarantee is completely satisfied, the user’s average percentage satisfied of resource guarantee will be greater than or equal to 1.
+When scheduling, RAS sorts users by the average percentage satisfied of resource guarantee and schedule topologies from users based on that ordering starting from the users with the lowest average percentage satisfied of resource guarantee.  When a user’s resource guarantee is completely satisfied, the user’s average percentage satisfied of resource guarantee will be greater than or equal to 1.
 
 ### Specifying Eviction Strategy
-The strategy for evicting topologies is also a pluggable interface in which the user can implement his or her own topology eviction strategy.  For a user to implement his or her own eviction strategy, he or she needs to implement the IEvictionStrategy Interface and set *Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY* to point to the implemented strategy class. For instance:
+The eviction strategy is used when there are not enough free resources in the cluster to schedule new topologies. If the cluster is full, we need a mechanism to evict topologies so that user resource guarantees can be met and additional resource can be shared fairly among users. The strategy for evicting topologies is also a pluggable interface in which the user can implement his or her own topology eviction strategy.  For a user to implement his or her own eviction strategy, he or she needs to implement the IEvictionStrategy Interface and set *Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY* to point to the implemented strategy class. For instance:
 
     resource.aware.scheduler.eviction.strategy: "backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy"
 
@@ -215,7 +219,6 @@ A default eviction strategy is provided.  The following explains how the default
 
 **DefaultEvictionStrategy**
 
-If the cluster is full, we need a mechanism to evict topologies so that user resource guarantees can be met and additional resource can be shared fairly among users
 
 To determine if topology eviction should occur we should take into account the priority of the topology that we are trying to schedule and whether the resource guarantees for the owner of the topology have been met.  
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
index 7104281..0fb3724 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
@@ -82,8 +82,9 @@ public class ResourceAwareExampleTopology {
      */
     conf.setTopologyWorkerMaxHeapSize(1024.0);
 
-    // Set topology priority 0-30 with 0 being the highest priority and 30 being the lowest priority.
-    conf.setTopologyPriority(30);
+    //topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases).
+    //Recommended range of 0-29 but no hard limit set.
+    conf.setTopologyPriority(29);
 
     // Set strategy to schedule topology. If not specified, default to backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
     conf.setTopologyStrategy(backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index d76825d..0885ce3 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -533,24 +533,21 @@
       (.readBlob blob-store (master-stormconf-key storm-id) nimbus-subject))))
 
 (defn read-topology-details [nimbus storm-id]
-  (let [conf (:conf nimbus)
-        blob-store (:blob-store nimbus)
-        storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
+  (let [blob-store (:blob-store nimbus)
+        storm-base (or
+                     (.storm-base (:storm-cluster-state nimbus) storm-id nil)
+                     (throw (NotAliveException. storm-id)))
         topology-conf (read-storm-conf-as-nimbus storm-id blob-store)
         topology (read-storm-topology-as-nimbus storm-id blob-store)
         executor->component (->> (compute-executor->component nimbus storm-id)
                                  (map-key (fn [[start-task end-task]]
-                                            (ExecutorDetails. (int start-task) (int end-task)))))
-        launch-time-secs (if storm-base (:launch-time-secs storm-base)
-                           (throw
-                             (NotAliveException. (str storm-id))))]
+                                            (ExecutorDetails. (int start-task) (int end-task)))))]
     (TopologyDetails. storm-id
                       topology-conf
                       topology
                       (:num-workers storm-base)
                       executor->component
-                      launch-time-secs
-                      )))
+                      (:launch-time-secs storm-base))))
 
 ;; Does not assume that clocks are synchronized. Executor heartbeat is only used so that
 ;; nimbus knows when it's received a new heartbeat. All timing is done by nimbus and

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index dacf4f8..6c31c19 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1861,7 +1861,7 @@ public class Config extends HashMap<String, Object> {
      * Sets the priority for a topology
      */
     @isInteger
-    @isPositiveNumber
+    @isPositiveNumber(includeZero = true)
     public static final String TOPOLOGY_PRIORITY = "topology.priority";
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index 92b2219..53fdaa4 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -28,8 +28,6 @@ import java.util.Set;
 import backtype.storm.Config;
 import backtype.storm.networktopography.DNSToSwitchMapping;
 import backtype.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class Cluster {
 
@@ -96,15 +94,15 @@ public class Cluster {
     /**
      * Get a copy of this cluster object
      */
-    public Cluster getCopy() {
+    public static Cluster getCopy(Cluster cluster) {
         HashMap<String, SchedulerAssignmentImpl> newAssignments = new HashMap<String, SchedulerAssignmentImpl>();
-        for (Map.Entry<String, SchedulerAssignmentImpl> entry : this.assignments.entrySet()) {
+        for (Map.Entry<String, SchedulerAssignmentImpl> entry : cluster.assignments.entrySet()) {
             newAssignments.put(entry.getKey(), new SchedulerAssignmentImpl(entry.getValue().getTopologyId(), entry.getValue().getExecutorToSlot()));
         }
         Map newConf = new HashMap<String, Object>();
-        newConf.putAll(this.conf);
-        Cluster copy = new Cluster(this.inimbus, this.supervisors, newAssignments, newConf);
-        copy.status = new HashMap<>(this.status);
+        newConf.putAll(cluster.conf);
+        Cluster copy = new Cluster(cluster.inimbus, cluster.supervisors, newAssignments, newConf);
+        copy.status = new HashMap<>(cluster.status);
         return copy;
     }
     
@@ -622,14 +620,12 @@ public class Cluster {
     /**
      * set scheduler status for a topology
      */
-    private static final Logger LOG = LoggerFactory
-            .getLogger(Cluster.class);
     public void setStatus(String topologyId, String status) {
         this.status.put(topologyId, status);
     }
 
     /**
-     * Get all schedule statuses
+     * Get all topology scheduler statuses
      */
     public Map<String, String> getStatusMap() {
         return this.status;
@@ -639,6 +635,7 @@ public class Cluster {
      * set scheduler status map
      */
     public void setStatusMap(Map<String, String> statusMap) {
+        this.status.clear();
         this.status.putAll(statusMap);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
index b6fbd07..3c8f987 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
@@ -68,8 +68,8 @@ public class Topologies {
         return _allComponents;
     }
 
-    public Topologies getCopy() {
-        return new Topologies(this.topologies);
+    public static Topologies getCopy(Topologies topologies) {
+        return new Topologies(topologies.topologies);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
index 166493f..9e35981 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
@@ -45,7 +45,7 @@ public class TopologyDetails {
     private Map<ExecutorDetails, String> executorToComponent;
     private int numWorkers;
     //<ExecutorDetails - Task, Map<String - Type of resource, Map<String - type of that resource, Double - amount>>>
-    private Map<ExecutorDetails, Map<String, Double>> _resourceList;
+    private Map<ExecutorDetails, Map<String, Double>> resourceList;
     //Max heap size for a worker used by topology
     private Double topologyWorkerMaxHeapSize;
     //topology priority
@@ -56,31 +56,33 @@ public class TopologyDetails {
     private static final Logger LOG = LoggerFactory.getLogger(TopologyDetails.class);
 
     public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers) {
-        this.topologyId = topologyId;
-        this.topologyConf = topologyConf;
-        this.topology = topology;
-        this.numWorkers = numWorkers;
+        this(topologyId, topologyConf, topology,  numWorkers,  null, 0);
     }
 
     public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology,
                            int numWorkers, Map<ExecutorDetails, String> executorToComponents) {
-        this(topologyId, topologyConf, topology, numWorkers);
+        this(topologyId, topologyConf, topology,  numWorkers,  executorToComponents, 0);
+    }
+
+    public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology,
+                           int numWorkers, Map<ExecutorDetails, String> executorToComponents, int launchTime) {
+        this.topologyId = topologyId;
+        this.topologyConf = topologyConf;
+        this.topology = topology;
+        this.numWorkers = numWorkers;
         this.executorToComponent = new HashMap<>(0);
         if (executorToComponents != null) {
             this.executorToComponent.putAll(executorToComponents);
         }
-        this.initResourceList();
+        if (this.topology != null) {
+            this.initResourceList();
+        }
         this.initConfigs();
-    }
-
-    public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology,
-                           int numWorkers, Map<ExecutorDetails, String> executorToComponents, int launchTime) {
-        this(topologyId, topologyConf, topology, numWorkers, executorToComponents);
         this.launchTime = launchTime;
     }
 
     public String getId() {
-        return topologyId;
+        return this.topologyId;
     }
 
     public String getName() {
@@ -88,11 +90,11 @@ public class TopologyDetails {
     }
 
     public Map getConf() {
-        return topologyConf;
+        return this.topologyConf;
     }
 
     public int getNumWorkers() {
-        return numWorkers;
+        return this.numWorkers;
     }
 
     public Map<ExecutorDetails, String> getExecutorToComponent() {
@@ -116,7 +118,7 @@ public class TopologyDetails {
     }
 
     private void initResourceList() {
-        _resourceList = new HashMap<>();
+        this.resourceList = new HashMap<>();
         // Extract bolt memory info
         if (this.topology.get_bolts() != null) {
             for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
@@ -124,9 +126,9 @@ public class TopologyDetails {
                 Map<String, Double> topology_resources = ResourceUtils.parseResources(bolt
                         .getValue().get_common().get_json_conf());
                 ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
-                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
+                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : this.executorToComponent.entrySet()) {
                     if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
-                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
+                        this.resourceList.put(anExecutorToComponent.getKey(), topology_resources);
                     }
                 }
             }
@@ -137,18 +139,18 @@ public class TopologyDetails {
                 Map<String, Double> topology_resources = ResourceUtils.parseResources(spout
                         .getValue().get_common().get_json_conf());
                 ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
-                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
+                for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : this.executorToComponent.entrySet()) {
                     if (spout.getKey().equals(anExecutorToComponent.getValue())) {
-                        _resourceList.put(anExecutorToComponent.getKey(), topology_resources);
+                        this.resourceList.put(anExecutorToComponent.getKey(), topology_resources);
                     }
                 }
             }
         } else {
-            LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
+            LOG.warn("Topology " + this.topologyId + " does not seem to have any spouts!");
         }
         //schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
         for(ExecutorDetails exec : this.getExecutors()) {
-            if (!_resourceList.containsKey(exec)) {
+            if (!this.resourceList.containsKey(exec)) {
                 LOG.debug(
                         "Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
                         this.getExecutorToComponent().get(exec),
@@ -163,7 +165,7 @@ public class TopologyDetails {
 
     private List<ExecutorDetails> componentToExecs(String comp) {
         List<ExecutorDetails> execs = new ArrayList<>();
-        for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
+        for (Map.Entry<ExecutorDetails, String> entry : this.executorToComponent.entrySet()) {
             if (entry.getValue().equals(comp)) {
                 execs.add(entry.getKey());
             }
@@ -264,7 +266,7 @@ public class TopologyDetails {
     public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
         Double ret = null;
         if (hasExecInTopo(exec)) {
-            ret = _resourceList
+            ret = this.resourceList
                     .get(exec)
                     .get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
         }
@@ -281,7 +283,7 @@ public class TopologyDetails {
     public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
         Double ret = null;
         if (hasExecInTopo(exec)) {
-            ret = _resourceList
+            ret = this.resourceList
                     .get(exec)
                     .get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
         }
@@ -310,7 +312,7 @@ public class TopologyDetails {
      */
     public Map<ExecutorDetails, Double> getTotalMemoryResourceList() {
         Map<ExecutorDetails, Double> ret = new HashMap<>();
-        for (ExecutorDetails exec : _resourceList.keySet()) {
+        for (ExecutorDetails exec : this.resourceList.keySet()) {
             ret.put(exec, getTotalMemReqTask(exec));
         }
         return ret;
@@ -322,7 +324,7 @@ public class TopologyDetails {
      */
     public Double getTotalCpuReqTask(ExecutorDetails exec) {
         if (hasExecInTopo(exec)) {
-            return _resourceList
+            return this.resourceList
                     .get(exec)
                     .get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
         }
@@ -387,17 +389,17 @@ public class TopologyDetails {
      */
     public Map<String, Double> getTaskResourceReqList(ExecutorDetails exec) {
         if (hasExecInTopo(exec)) {
-            return _resourceList.get(exec);
+            return this.resourceList.get(exec);
         }
         return null;
     }
 
     /**
      * Checks if a executor is part of this topology
-     * @return Boolean whether or not a certain ExecutorDetail is included in the _resourceList.
+     * @return Boolean whether or not a certain ExecutorDetail is included in the resourceList.
      */
     public boolean hasExecInTopo(ExecutorDetails exec) {
-        return _resourceList != null && _resourceList.containsKey(exec);
+        return this.resourceList != null && this.resourceList.containsKey(exec);
     }
 
     /**
@@ -408,22 +410,22 @@ public class TopologyDetails {
             LOG.warn("Executor {} already exists...ResourceList: {}", exec, getTaskResourceReqList(exec));
             return;
         }
-        _resourceList.put(exec, resourceList);
+        this.resourceList.put(exec, resourceList);
     }
 
     /**
      * Add default resource requirements for a executor
      */
     public void addDefaultResforExec(ExecutorDetails exec) {
-        Double topologyComponentCpuPcorePercent = Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
+        Double topologyComponentCpuPcorePercent = Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
         if (topologyComponentCpuPcorePercent == null) {
             LOG.warn("default value for topology.component.cpu.pcore.percent needs to be set!");
         }
-        Double topologyComponentResourcesOffheapMemoryMb = Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
+        Double topologyComponentResourcesOffheapMemoryMb = Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
         if (topologyComponentResourcesOffheapMemoryMb == null) {
             LOG.warn("default value for topology.component.resources.offheap.memory.mb needs to be set!");
         }
-        Double topologyComponentResourcesOnheapMemoryMb = Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
+        Double topologyComponentResourcesOnheapMemoryMb = Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
         if (topologyComponentResourcesOnheapMemoryMb == null) {
             LOG.warn("default value for topology.component.resources.onheap.memory.mb needs to be set!");
         }
@@ -434,9 +436,9 @@ public class TopologyDetails {
         defaultResourceList.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topologyComponentResourcesOnheapMemoryMb);
         LOG.debug("Scheduling Executor: {} with memory requirement as onHeap: {} - offHeap: {} " +
                         "and CPU requirement: {}",
-                exec, topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
-                topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
-                topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
+                exec, this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
+                this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
+                this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
         addResourcesForExec(exec, defaultResourceList);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
index 8d0df62..be39d2a 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
@@ -22,13 +22,11 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
 import java.util.ArrayList;
 
-import backtype.storm.Config;
 import backtype.storm.scheduler.Topologies;
 import backtype.storm.scheduler.TopologyDetails;
 import org.slf4j.Logger;
@@ -45,79 +43,76 @@ import backtype.storm.scheduler.WorkerSlot;
  */
 public class RAS_Node {
     private static final Logger LOG = LoggerFactory.getLogger(RAS_Node.class);
-    private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
-    private Set<WorkerSlot> _freeSlots = new HashSet<WorkerSlot>();
-    private final String _nodeId;
-    private String _hostname;
-    private boolean _isAlive;
-    private SupervisorDetails _sup;
-    private Double _availMemory;
-    private Double _availCPU;
-    private List<WorkerSlot> _slots;
-    private Cluster _cluster;
-    private Topologies _topologies;
+    private Map<String, Set<WorkerSlot>> topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
+    private Set<WorkerSlot> freeSlots = new HashSet<WorkerSlot>();
+    private final String nodeId;
+    private String hostname;
+    private boolean isAlive;
+    private SupervisorDetails sup;
+    private Double availMemory;
+    private Double availCPU;
+    private Cluster cluster;
+    private Topologies topologies;
 
     public RAS_Node(String nodeId, Set<Integer> allPorts, boolean isAlive,
                     SupervisorDetails sup, Cluster cluster, Topologies topologies) {
-        _slots = new ArrayList<WorkerSlot>();
-        _nodeId = nodeId;
-        _isAlive = isAlive;
-        if (_isAlive && allPorts != null) {
+        this.nodeId = nodeId;
+        this.isAlive = isAlive;
+        if (this.isAlive && allPorts != null) {
             for (int port : allPorts) {
-                _freeSlots.add(new WorkerSlot(_nodeId, port));
+                this.freeSlots.add(new WorkerSlot(this.nodeId, port));
             }
-            _sup = sup;
-            _hostname = sup.getHost();
-            _availMemory = this.getTotalMemoryResources();
-            _availCPU = this.getTotalCpuResources();
-            _slots.addAll(_freeSlots);
+            this.sup = sup;
+            this.hostname = sup.getHost();
+            this.availMemory = getTotalMemoryResources();
+            this.availCPU = getTotalCpuResources();
         }
-        this._cluster = cluster;
-        this._topologies = topologies;
+        this.cluster = cluster;
+        this.topologies = topologies;
     }
 
     public String getId() {
-        return _nodeId;
+        return this.nodeId;
     }
 
     public String getHostname() {
-        return _hostname;
+        return this.hostname;
     }
 
     public Collection<WorkerSlot> getFreeSlots() {
-        return _freeSlots;
+        return this.freeSlots;
     }
 
     public Collection<WorkerSlot> getUsedSlots() {
         Collection<WorkerSlot> ret = new LinkedList<WorkerSlot>();
-        for (Collection<WorkerSlot> workers : _topIdToUsedSlots.values()) {
+        for (Collection<WorkerSlot> workers : this.topIdToUsedSlots.values()) {
             ret.addAll(workers);
         }
         return ret;
     }
 
     public boolean isAlive() {
-        return _isAlive;
+        return this.isAlive;
     }
 
     /**
      * @return a collection of the topology ids currently running on this node
      */
     public Collection<String> getRunningTopologies() {
-        return _topIdToUsedSlots.keySet();
+        return this.topIdToUsedSlots.keySet();
     }
 
     public boolean isTotallyFree() {
-        return _topIdToUsedSlots.isEmpty();
+        return this.topIdToUsedSlots.isEmpty();
     }
 
     public int totalSlotsFree() {
-        return _freeSlots.size();
+        return this.freeSlots.size();
     }
 
     public int totalSlotsUsed() {
         int total = 0;
-        for (Set<WorkerSlot> slots : _topIdToUsedSlots.values()) {
+        for (Set<WorkerSlot> slots : this.topIdToUsedSlots.values()) {
             total += slots.size();
         }
         return total;
@@ -129,7 +124,7 @@ public class RAS_Node {
 
     public int totalSlotsUsed(String topId) {
         int total = 0;
-        Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
+        Set<WorkerSlot> slots = this.topIdToUsedSlots.get(topId);
         if (slots != null) {
             total = slots.size();
         }
@@ -137,42 +132,42 @@ public class RAS_Node {
     }
 
     private void validateSlot(WorkerSlot ws) {
-        if (!_nodeId.equals(ws.getNodeId())) {
+        if (!this.nodeId.equals(ws.getNodeId())) {
             throw new IllegalArgumentException(
                     "Trying to add a slot to the wrong node " + ws +
-                            " is not a part of " + _nodeId);
+                            " is not a part of " + this.nodeId);
         }
     }
 
     void addOrphanedSlot(WorkerSlot ws) {
-        if (_isAlive) {
+        if (this.isAlive) {
             throw new IllegalArgumentException("Orphaned Slots " +
                     "only are allowed on dead nodes.");
         }
         validateSlot(ws);
-        if (_freeSlots.contains(ws)) {
+        if (this.freeSlots.contains(ws)) {
             return;
         }
-        for (Set<WorkerSlot> used : _topIdToUsedSlots.values()) {
+        for (Set<WorkerSlot> used : this.topIdToUsedSlots.values()) {
             if (used.contains(ws)) {
                 return;
             }
         }
-        _freeSlots.add(ws);
+        this.freeSlots.add(ws);
     }
 
     boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) {
         validateSlot(ws);
-        if (!_freeSlots.remove(ws)) {
+        if (!this.freeSlots.remove(ws)) {
             if (dontThrow) {
                 return true;
             }
             throw new IllegalStateException("Assigning a slot that was not free " + ws);
         }
-        Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId);
+        Set<WorkerSlot> usedSlots = this.topIdToUsedSlots.get(topId);
         if (usedSlots == null) {
             usedSlots = new HashSet<WorkerSlot>();
-            _topIdToUsedSlots.put(topId, usedSlots);
+            this.topIdToUsedSlots.put(topId, usedSlots);
         }
         usedSlots.add(ws);
         return false;
@@ -182,18 +177,18 @@ public class RAS_Node {
      * Free all slots on this node.  This will update the Cluster too.
      */
     public void freeAllSlots() {
-        if (!_isAlive) {
-            LOG.warn("Freeing all slots on a dead node {} ", _nodeId);
-        }
-        for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
-            _cluster.freeSlots(entry.getValue());
-            _availCPU = this.getTotalCpuResources();
-            _availMemory = this.getAvailableMemoryResources();
-            if (_isAlive) {
-                _freeSlots.addAll(entry.getValue());
+        if (!this.isAlive) {
+            LOG.warn("Freeing all slots on a dead node {} ", this.nodeId);
+        }
+        for (Entry<String, Set<WorkerSlot>> entry : this.topIdToUsedSlots.entrySet()) {
+            this.cluster.freeSlots(entry.getValue());
+            this.availCPU = this.getTotalCpuResources();
+            this.availMemory = this.getAvailableMemoryResources();
+            if (this.isAlive) {
+                this.freeSlots.addAll(entry.getValue());
             }
         }
-        _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
+        this.topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
     }
 
     /**
@@ -201,16 +196,16 @@ public class RAS_Node {
      * @param ws the slot to free
      */
     public void free(WorkerSlot ws) {
-        LOG.info("freeing ws {} on node {}", ws, _hostname);
-        if (_freeSlots.contains(ws)) return;
-        for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
+        LOG.info("freeing ws {} on node {}", ws, this.hostname);
+        if (this.freeSlots.contains(ws)) return;
+        for (Entry<String, Set<WorkerSlot>> entry : this.topIdToUsedSlots.entrySet()) {
             Set<WorkerSlot> slots = entry.getValue();
             double memUsed = this.getMemoryUsedByWorker(ws);
             double cpuUsed = this.getCpuUsedByWorker(ws);
             if (slots.remove(ws)) {
-                _cluster.freeSlot(ws);
-                if (_isAlive) {
-                    _freeSlots.add(ws);
+                this.cluster.freeSlot(ws);
+                if (this.isAlive) {
+                    this.freeSlots.add(ws);
                 }
                 this.freeMemory(memUsed);
                 this.freeCPU(cpuUsed);
@@ -218,7 +213,7 @@ public class RAS_Node {
             }
         }
         throw new IllegalArgumentException("Tried to free a slot that was not" +
-                " part of this node " + _nodeId);
+                " part of this node " + this.nodeId);
     }
 
     /**
@@ -226,35 +221,37 @@ public class RAS_Node {
      * @param topId the topology to free slots for
      */
     public void freeTopology(String topId) {
-        Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
+        Set<WorkerSlot> slots = this.topIdToUsedSlots.get(topId);
         if (slots == null || slots.isEmpty()) {
             return;
         }
         for (WorkerSlot ws : slots) {
-            _cluster.freeSlot(ws);
+            this.cluster.freeSlot(ws);
             this.freeMemory(this.getMemoryUsedByWorker(ws));
             this.freeCPU(this.getCpuUsedByWorker(ws));
-            if (_isAlive) {
-                _freeSlots.add(ws);
+            if (this.isAlive) {
+                this.freeSlots.add(ws);
             }
         }
-        _topIdToUsedSlots.remove(topId);
+        this.topIdToUsedSlots.remove(topId);
     }
 
     private void freeMemory(double amount) {
-        _availMemory += amount;
-        LOG.debug("freeing {} memory on node {}...avail mem: {}", amount, this.getHostname(), _availMemory);
-        if (_availMemory > this.getTotalMemoryResources()) {
+        LOG.debug("freeing {} memory on node {}...avail mem: {}", amount, this.getHostname(), this.availMemory);
+        if((this.availMemory + amount) > this.getTotalCpuResources()) {
             LOG.warn("Freeing more memory than there exists!");
+            return;
         }
+        this.availMemory += amount;
     }
 
     private void freeCPU(double amount) {
-        _availCPU += amount;
-        LOG.debug("freeing {} CPU on node...avail CPU: {}", amount, this.getHostname(), _availCPU);
-        if (_availCPU > this.getAvailableCpuResources()) {
-            LOG.warn("Freeing more memory than there exists!");
+        LOG.debug("freeing {} CPU on node...avail CPU: {}", amount, this.getHostname(), this.availCPU);
+        if ((this.availCPU + amount) > this.getAvailableCpuResources()) {
+            LOG.warn("Freeing more CPU than there exists!");
+            return;
         }
+        this.availCPU += amount;
     }
 
     public double getMemoryUsedByWorker(WorkerSlot ws) {
@@ -262,7 +259,7 @@ public class RAS_Node {
         if (topo == null) {
             return 0.0;
         }
-        Collection<ExecutorDetails> execs = this.getExecutors(ws, this._cluster);
+        Collection<ExecutorDetails> execs = this.getExecutors(ws, this.cluster);
         double totalMemoryUsed = 0.0;
         for (ExecutorDetails exec : execs) {
             totalMemoryUsed += topo.getTotalMemReqTask(exec);
@@ -275,7 +272,7 @@ public class RAS_Node {
         if (topo == null) {
             return 0.0;
         }
-        Collection<ExecutorDetails> execs = this.getExecutors(ws, this._cluster);
+        Collection<ExecutorDetails> execs = this.getExecutors(ws, this.cluster);
         double totalCpuUsed = 0.0;
         for (ExecutorDetails exec : execs) {
             totalCpuUsed += topo.getTotalCpuReqTask(exec);
@@ -284,12 +281,12 @@ public class RAS_Node {
     }
 
     public TopologyDetails findTopologyUsingWorker(WorkerSlot ws) {
-        for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
+        for (Entry<String, Set<WorkerSlot>> entry : this.topIdToUsedSlots.entrySet()) {
             String topoId = entry.getKey();
             Set<WorkerSlot> workers = entry.getValue();
             for (WorkerSlot worker : workers) {
                 if (worker.getNodeId().equals(ws.getNodeId()) && worker.getPort() == ws.getPort()) {
-                    return _topologies.getById(topoId);
+                    return this.topologies.getById(topoId);
                 }
             }
         }
@@ -324,24 +321,24 @@ public class RAS_Node {
     }
 
     public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors) {
-        if (!_isAlive) {
-            throw new IllegalStateException("Trying to adding to a dead node " + _nodeId);
+        if (!this.isAlive) {
+            throw new IllegalStateException("Trying to adding to a dead node " + this.nodeId);
         }
-        if (_freeSlots.isEmpty()) {
-            throw new IllegalStateException("Trying to assign to a full node " + _nodeId);
+        if (this.freeSlots.isEmpty()) {
+            throw new IllegalStateException("Trying to assign to a full node " + this.nodeId);
         }
         if (executors.size() == 0) {
-            LOG.warn("Trying to assign nothing from " + td.getId() + " to " + _nodeId + " (Ignored)");
+            LOG.warn("Trying to assign nothing from " + td.getId() + " to " + this.nodeId + " (Ignored)");
         }
 
         if (target == null) {
-            target = _freeSlots.iterator().next();
+            target = this.freeSlots.iterator().next();
         }
-        if (!_freeSlots.contains(target)) {
-            throw new IllegalStateException("Trying to assign already used slot" + target.getPort() + "on node " + _nodeId);
+        if (!this.freeSlots.contains(target)) {
+            throw new IllegalStateException("Trying to assign already used slot" + target.getPort() + "on node " + this.nodeId);
         } else {
             allocateResourceToSlot(td, executors, target);
-            _cluster.assign(target, td.getId(), executors);
+            this.cluster.assign(target, td.getId(), executors);
             assignInternal(target, td.getId(), false);
         }
     }
@@ -359,21 +356,21 @@ public class RAS_Node {
     @Override
     public boolean equals(Object other) {
         if (other instanceof RAS_Node) {
-            return _nodeId.equals(((RAS_Node) other)._nodeId);
+            return this.nodeId.equals(((RAS_Node) other).nodeId);
         }
         return false;
     }
 
     @Override
     public int hashCode() {
-        return _nodeId.hashCode();
+        return this.nodeId.hashCode();
     }
 
     @Override
     public String toString() {
-        return "{Node: " + ((_sup == null) ? "null (possibly down)" : _sup.getHost())
-                + ", AvailMem: " + ((_availMemory == null) ? "N/A" : _availMemory.toString())
-                + ", AvailCPU: " + ((_availCPU == null) ? "N/A" : _availCPU.toString()) + "}";
+        return "{Node: " + ((this.sup == null) ? "null (possibly down)" : this.sup.getHost())
+                + ", AvailMem: " + ((this.availMemory == null) ? "N/A" : this.availMemory.toString())
+                + ", this.availCPU: " + ((this.availCPU == null) ? "N/A" : this.availCPU.toString()) + "}";
     }
 
     public static int countSlotsUsed(String topId, Collection<RAS_Node> nodes) {
@@ -436,7 +433,7 @@ public class RAS_Node {
      * @param amount the amount to set as available memory
      */
     public void setAvailableMemory(Double amount) {
-        _availMemory = amount;
+        this.availMemory = amount;
     }
 
     /**
@@ -444,10 +441,10 @@ public class RAS_Node {
      * @return the available memory for this node
      */
     public Double getAvailableMemoryResources() {
-        if (_availMemory == null) {
+        if (this.availMemory == null) {
             return 0.0;
         }
-        return _availMemory;
+        return this.availMemory;
     }
 
     /**
@@ -455,8 +452,8 @@ public class RAS_Node {
      * @return the total memory for this node
      */
     public Double getTotalMemoryResources() {
-        if (_sup != null && _sup.getTotalMemory() != null) {
-            return _sup.getTotalMemory();
+        if (this.sup != null && this.sup.getTotalMemory() != null) {
+            return this.sup.getTotalMemory();
         } else {
             return 0.0;
         }
@@ -468,12 +465,12 @@ public class RAS_Node {
      * @return the current available memory for this node after consumption
      */
     public Double consumeMemory(Double amount) {
-        if (amount > _availMemory) {
-            LOG.error("Attempting to consume more memory than available! Needed: {}, we only have: {}", amount, _availMemory);
+        if (amount > this.availMemory) {
+            LOG.error("Attempting to consume more memory than available! Needed: {}, we only have: {}", amount, this.availMemory);
             return null;
         }
-        _availMemory = _availMemory - amount;
-        return _availMemory;
+        this.availMemory = this.availMemory - amount;
+        return this.availMemory;
     }
 
     /**
@@ -481,10 +478,10 @@ public class RAS_Node {
      * @return the available cpu for this node
      */
     public Double getAvailableCpuResources() {
-        if (_availCPU == null) {
+        if (this.availCPU == null) {
             return 0.0;
         }
-        return _availCPU;
+        return this.availCPU;
     }
 
     /**
@@ -492,8 +489,8 @@ public class RAS_Node {
      * @return the total cpu for this node
      */
     public Double getTotalCpuResources() {
-        if (_sup != null && _sup.getTotalCPU() != null) {
-            return _sup.getTotalCPU();
+        if (this.sup != null && this.sup.getTotalCPU() != null) {
+            return this.sup.getTotalCPU();
         } else {
             return 0.0;
         }
@@ -505,12 +502,12 @@ public class RAS_Node {
      * @return the current available cpu for this node after consumption
      */
     public Double consumeCPU(Double amount) {
-        if (amount > _availCPU) {
-            LOG.error("Attempting to consume more CPU than available! Needed: {}, we only have: {}", amount, _availCPU);
+        if (amount > this.availCPU) {
+            LOG.error("Attempting to consume more CPU than available! Needed: {}, we only have: {}", amount, this.availCPU);
             return null;
         }
-        _availCPU = _availCPU - amount;
-        return _availCPU;
+        this.availCPU = this.availCPU - amount;
+        return this.availCPU;
     }
 
     /**
@@ -526,6 +523,6 @@ public class RAS_Node {
     }
 
     public Map<String, Set<WorkerSlot>> getTopoIdTousedSlots() {
-        return _topIdToUsedSlots;
+        return this.topIdToUsedSlots;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
index 5a99bdd..abba0b0 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
@@ -65,7 +65,7 @@ public class RAS_Nodes {
                     nodeIdToNode.put(id, node);
                 }
                 if (!node.isAlive()) {
-                    //The supervisor on the node down so add an orphaned slot to hold the unsupervised worker
+                    //The supervisor on the node is down so add an orphaned slot to hold the unsupervised worker
                     node.addOrphanedSlot(workerSlot);
                 }
                 if (node.assignInternal(workerSlot, topId, true)) {
@@ -113,7 +113,8 @@ public class RAS_Nodes {
                     if (topoMemoryResourceList.containsKey(exec)) {
                         node.consumeResourcesforTask(exec, topologies.getById(entry.getKey()));
                     } else {
-                        LOG.warn("Resource Req not found...Scheduling Task{} with memory requirement as on heap - {} and off heap - {} and CPU requirement as {}",
+                        LOG.warn("Resource Req not found...Scheduling Task {} with memory requirement as on heap - {} " +
+                                        "and off heap - {} and CPU requirement as {}",
                                 exec,
                                 Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
                                 Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index c2e2fcd..0558e12 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -60,15 +60,13 @@ public class ResourceAwareScheduler implements IScheduler {
                 User user = userMapEntry.getValue();
                 this.userMap.put(userId, user.getCopy());
             }
-            this.cluster = cluster.getCopy();
-            this.topologies = topologies.getCopy();
+            this.cluster = Cluster.getCopy(cluster);
+            this.topologies = topologies.getCopy(topologies);
             this.nodes = new RAS_Nodes(this.cluster, this.topologies);
             this.conf.putAll(conf);
-
         }
     }
 
-
     @SuppressWarnings("rawtypes")
     private Map conf;
 
@@ -83,7 +81,7 @@ public class ResourceAwareScheduler implements IScheduler {
 
     @Override
     public void schedule(Topologies topologies, Cluster cluster) {
-        LOG.info("\n\n\nRerunning ResourceAwareScheduler...");
+        LOG.debug("\n\n\nRerunning ResourceAwareScheduler...");
         //initialize data structures
         this.initialize(topologies, cluster);
         //logs everything that is currently scheduled and the location at which they are scheduled
@@ -114,7 +112,7 @@ public class ResourceAwareScheduler implements IScheduler {
                 //Call scheduling priority strategy
                 td = schedulingPrioritystrategy.getNextTopologyToSchedule();
             } catch (Exception e) {
-                LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled! Error: {} StackTrack: {}"
+                LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled! Error: {} StackTrace: {}"
                         , schedulingPrioritystrategy.getClass().getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
                 break;
             }
@@ -153,7 +151,7 @@ public class ResourceAwareScheduler implements IScheduler {
                     rasStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
                     result = rasStrategy.schedule(td);
                 } catch (Exception e) {
-                    LOG.error("Exception thrown when running strategy {} to schedule topology {}. Topology will not be scheduled! Error: {} StackTrack: {}"
+                    LOG.error("Exception thrown when running strategy {} to schedule topology {}. Topology will not be scheduled! Error: {} StackTrace: {}"
                             , rasStrategy.getClass().getName(), td.getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
                     this.restoreCheckpointSchedulingState(schedulingState);
                     //since state is restored need the update User topologySubmitter to the new User object in userMap
@@ -205,7 +203,7 @@ public class ResourceAwareScheduler implements IScheduler {
                                 evictionStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
                                 madeSpace = evictionStrategy.makeSpaceForTopo(td);
                             } catch (Exception e) {
-                                LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}. No evictions will be done! Error: {} StackTrack: {}"
+                                LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}. No evictions will be done! Error: {} StackTrace: {}"
                                         , evictionStrategy.getClass().getName(), td.getName(), e.getClass().getName(), Arrays.toString(e.getStackTrace()));
                                 this.restoreCheckpointSchedulingState(schedulingState);
                                 //since state is restored need the update User topologySubmitter to the new User object in userMap

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
index 81c2abc..1812580 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -61,7 +61,6 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
         //user has enough resource under his or her resource guarantee to schedule topology
         if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
             if (evictUser != null) {
-
                 TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
                 evictTopology(topologyEvict);
                 return true;

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
index 990ccd6..0d891ff 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -60,7 +60,6 @@ public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStr
         for (User user : this.userMap.values()) {
             if (user.hasTopologyNeedSchedule()) {
                 Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
-
                 if (least > userResourcePoolAverageUtilization) {
                     ret = user;
                     least = userResourcePoolAverageUtilization;

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 75cc5eb..df4ae14 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -170,7 +170,8 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             LOG.error("Not all executors successfully scheduled: {}",
                     executorsNotScheduled);
             schedulerAssignmentMap = null;
-            result = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, (td.getExecutors().size() - unassignedExecutors.size()) + "/" + td.getExecutors().size() + " executors scheduled");
+            result = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
+                    (td.getExecutors().size() - unassignedExecutors.size()) + "/" + td.getExecutors().size() + " executors scheduled");
         } else {
             LOG.debug("All resources successfully scheduled!");
             result = SchedulingResult.successWithMsg(schedulerAssignmentMap, "Fully Scheduled by DefaultResourceAwareStrategy");

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
index bb2e955..a7ac5c9 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -47,8 +47,8 @@ public interface IStrategy {
      * @param td
      * @return returns a SchedulingResult object containing SchedulingStatus object to indicate whether scheduling is successful
      * The strategy must calculate a scheduling in the format of Map<WorkerSlot, Collection<ExecutorDetails>> where the key of
-     * this map is the worker slot that the value (collection of executors) should be assigned to. if a scheduling is calculated successfully,
-     * put the scheduling map in the SchedulingResult object.
+     * this map is the worker slot that the value (collection of executors) should be assigned to.
+     * if a scheduling is calculated successfully, put the scheduling map in the SchedulingResult object.
      */
     public SchedulingResult schedule(TopologyDetails td);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
index 57c9f40..6df0108 100644
--- a/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/validation/ConfigValidation.java
@@ -540,7 +540,8 @@ public class ConfigValidation {
             try {
                 Class objectClass = Class.forName((String) o);
                 if (!this.classImplements.isAssignableFrom(objectClass)) {
-                    throw new IllegalArgumentException("Field " + name + " with value " + o + " does not implement " + this.classImplements.getName());
+                    throw new IllegalArgumentException("Field " + name + " with value " + o
+                            + " does not implement " + this.classImplements.getName());
                 }
             } catch (ClassNotFoundException e) {
                 throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
index d42915b..f9b4cd6 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -95,7 +95,6 @@ public class TestResourceAwareScheduler {
         TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
         TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
 
-
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo1.getId(), topo1);
         topoMap.put(topo2.getId(), topo2);
@@ -360,7 +359,6 @@ public class TestResourceAwareScheduler {
         TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
         TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
 
-
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
 
         TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
@@ -468,19 +466,16 @@ public class TestResourceAwareScheduler {
 
         TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 2, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
 
-
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
 
         TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
         TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
 
-
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
 
         TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
         TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 29);
 
-
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
         topoMap.put(topo2.getId(), topo2);
         topoMap.put(topo3.getId(), topo3);
@@ -747,7 +742,6 @@ public class TestResourceAwareScheduler {
         resourceUserPool.get("bobby").put("cpu", 100.0);
         resourceUserPool.get("bobby").put("memory", 1000.0);
 
-
         resourceUserPool.put("derek", new HashMap<String, Number>());
         resourceUserPool.get("derek").put("cpu", 100.0);
         resourceUserPool.get("derek").put("memory", 1000.0);
@@ -907,7 +901,6 @@ public class TestResourceAwareScheduler {
         resourceUserPool.get("bobby").put("cpu", 100.0);
         resourceUserPool.get("bobby").put("memory", 1000.0);
 
-
         resourceUserPool.put("derek", new HashMap<String, Number>());
         resourceUserPool.get("derek").put("cpu", 25.0);
         resourceUserPool.get("derek").put("memory", 250.0);
@@ -951,7 +944,6 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
 
-
         for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
             Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
@@ -1090,7 +1082,6 @@ public class TestResourceAwareScheduler {
         topoMap.put(topo5.getId(), topo5);
         topoMap.put(topo6.getId(), topo6);
 
-
         Topologies topologies = new Topologies(topoMap);
 
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
@@ -1106,7 +1097,6 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
 
-
         for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
             Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/a7f69135/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 2098f0c..dcd487f 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -185,7 +185,6 @@ public class TestUtilsForResourceAwareScheduler {
         }
 
         public void close() {
-
         }
 
         public void nextTuple() {
@@ -197,11 +196,9 @@ public class TestUtilsForResourceAwareScheduler {
         }
 
         public void ack(Object msgId) {
-
         }
 
         public void fail(Object msgId) {
-
         }
 
         public void declareOutputFields(OutputFieldsDeclarer declarer) {