You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/29 18:36:08 UTC

[1/2] storm git commit: Merge branch 'ras_refactor' of https://github.com/jerrypeng/storm into STORM-1634

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 1cfaf1245 -> 20ba5189f


Merge branch 'ras_refactor' of https://github.com/jerrypeng/storm into STORM-1634

STORM-1634: Refactoring of Resource Aware Scheduler


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bf99ae51
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bf99ae51
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bf99ae51

Branch: refs/heads/1.x-branch
Commit: bf99ae51019c53135cbc23c7465440fc63b1299a
Parents: 1cfaf12
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Mar 29 11:10:55 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Mar 29 11:12:14 2016 -0500

----------------------------------------------------------------------
 .../jvm/org/apache/storm/scheduler/Cluster.java |  23 ++--
 .../org/apache/storm/scheduler/Topologies.java  |  11 +-
 .../scheduler/resource/ClusterStateData.java    | 101 --------------
 .../resource/ResourceAwareScheduler.java        | 138 ++++++++-----------
 .../scheduler/resource/SchedulingState.java     |  56 ++++++++
 .../apache/storm/scheduler/resource/User.java   |  24 ++--
 .../eviction/DefaultEvictionStrategy.java       |  10 +-
 .../strategies/eviction/IEvictionStrategy.java  |   9 +-
 .../DefaultSchedulingPriorityStrategy.java      |   9 +-
 .../priority/ISchedulingPriorityStrategy.java   |   9 +-
 .../DefaultResourceAwareStrategy.java           |  86 ++++++------
 .../strategies/scheduling/IStrategy.java        |   6 +-
 .../resource/TestResourceAwareScheduler.java    |  45 ++++++
 13 files changed, 247 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java b/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
index 4ac4eaf..a6622ce 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
@@ -92,18 +92,17 @@ public class Cluster {
     }
 
     /**
-     * Get a copy of this cluster object
-     */
-    public static Cluster getCopy(Cluster cluster) {
-        HashMap<String, SchedulerAssignmentImpl> newAssignments = new HashMap<String, SchedulerAssignmentImpl>();
-        for (Map.Entry<String, SchedulerAssignmentImpl> entry : cluster.assignments.entrySet()) {
-            newAssignments.put(entry.getKey(), new SchedulerAssignmentImpl(entry.getValue().getTopologyId(), entry.getValue().getExecutorToSlot()));
-        }
-        Map newConf = new HashMap<String, Object>();
-        newConf.putAll(cluster.conf);
-        Cluster copy = new Cluster(cluster.inimbus, cluster.supervisors, newAssignments, newConf);
-        copy.status = new HashMap<>(cluster.status);
-        return copy;
+     * Copy constructor
+     */
+    public Cluster(Cluster src) {
+        this(src.inimbus, src.supervisors, new HashMap<String, SchedulerAssignmentImpl>(), new HashMap<String, Object>(src.conf));
+        this.supervisorsResources.putAll(src.supervisorsResources);
+        for (Map.Entry<String, SchedulerAssignmentImpl> entry : src.assignments.entrySet()) {
+            this.assignments.put(entry.getKey(), new SchedulerAssignmentImpl(entry.getValue().getTopologyId(), entry.getValue().getExecutorToSlot()));
+        }
+        this.status.putAll(src.status);
+        this.topologyResources.putAll(src.topologyResources);
+        this.blackListedHosts.addAll(src.blackListedHosts);
     }
     
     public void setBlacklistedHosts(Set<String> hosts) {

http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/Topologies.java b/storm-core/src/jvm/org/apache/storm/scheduler/Topologies.java
index f9478ab..82cb790 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/Topologies.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/Topologies.java
@@ -39,6 +39,13 @@ public class Topologies {
             this.nameToId.put(topology.getName(), entry.getKey());
         }
     }
+
+    /**
+     * copy constructor
+     */
+    public Topologies(Topologies src) {
+        this(src.topologies);
+    }
     
     public TopologyDetails getById(String topologyId) {
         return this.topologies.get(topologyId);
@@ -68,10 +75,6 @@ public class Topologies {
         return _allComponents;
     }
 
-    public static Topologies getCopy(Topologies topologies) {
-        return new Topologies(topologies.topologies);
-    }
-
     @Override
     public String toString() {
         StringBuilder ret = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
deleted file mode 100644
index ece2800..0000000
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.ExecutorDetails;
-import org.apache.storm.scheduler.Topologies;
-import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.WorkerSlot;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A class to specify which data and API to expose to a scheduling strategy
- */
-public class ClusterStateData {
-
-    private final Cluster cluster;
-
-    public final Topologies topologies;
-
-    // Information regarding all nodes in the cluster
-    public Map<String, NodeDetails> nodes = new HashMap<String, NodeDetails>();
-
-    public static final class NodeDetails {
-
-        private final RAS_Node node;
-
-        public NodeDetails(RAS_Node node) {
-            this.node = node;
-        }
-
-        public String getId() {
-            return this.node.getId();
-        }
-
-        public String getHostname() {
-            return this.node.getHostname();
-        }
-
-        public Collection<WorkerSlot> getFreeSlots() {
-            return this.node.getFreeSlots();
-        }
-
-        public void consumeResourcesforTask(ExecutorDetails exec, TopologyDetails topo) {
-            this.node.consumeResourcesforTask(exec, topo);
-        }
-
-        public Double getAvailableMemoryResources() {
-            return this.node.getAvailableMemoryResources();
-        }
-
-        public Double getAvailableCpuResources() {
-            return this.node.getAvailableCpuResources();
-        }
-
-        public Double getTotalMemoryResources() {
-            return this.node.getTotalMemoryResources();
-        }
-
-        public Double getTotalCpuResources() {
-            return this.node.getTotalCpuResources();
-        }
-    }
-
-    public ClusterStateData(Cluster cluster, Topologies topologies) {
-        this.cluster = cluster;
-        this.topologies = topologies;
-        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
-        for (Map.Entry<String, RAS_Node> entry : nodes.entrySet()) {
-            this.nodes.put(entry.getKey(), new NodeDetails(entry.getValue()));
-        }
-    }
-
-    public Collection<ExecutorDetails> getUnassignedExecutors(String topoId) {
-        return this.cluster.getUnassignedExecutors(this.topologies.getById(topoId));
-    }
-
-    public Map<String, List<String>> getNetworkTopography() {
-        return this.cluster.getNetworkTopography();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 2b35d6b..087fe6b 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -41,30 +41,8 @@ import java.util.Set;
 
 public class ResourceAwareScheduler implements IScheduler {
 
-    private Map<String, User> userMap;
-    private Cluster cluster;
-    private Topologies topologies;
-    private RAS_Nodes nodes;
-
-    private class SchedulingState {
-        private Map<String, User> userMap = new HashMap<String, User>();
-        private Cluster cluster;
-        private Topologies topologies;
-        private RAS_Nodes nodes;
-        private Map conf = new Config();
-
-        public SchedulingState(Map<String, User> userMap, Cluster cluster, Topologies topologies, RAS_Nodes nodes, Map conf) {
-            for (Map.Entry<String, User> userMapEntry : userMap.entrySet()) {
-                String userId = userMapEntry.getKey();
-                User user = userMapEntry.getValue();
-                this.userMap.put(userId, user.getCopy());
-            }
-            this.cluster = Cluster.getCopy(cluster);
-            this.topologies = topologies.getCopy(topologies);
-            this.nodes = new RAS_Nodes(this.cluster, this.topologies);
-            this.conf.putAll(conf);
-        }
-    }
+    // Object that holds the current scheduling state
+    private SchedulingState schedulingState;
 
     @SuppressWarnings("rawtypes")
     private Map conf;
@@ -86,7 +64,7 @@ public class ResourceAwareScheduler implements IScheduler {
         //logs everything that is currently scheduled and the location at which they are scheduled
         LOG.info("Cluster scheduling:\n{}", ResourceUtils.printScheduling(cluster, topologies));
         //logs the resources available/used for every node
-        LOG.info("Nodes:\n{}", this.nodes);
+        LOG.info("Nodes:\n{}", this.schedulingState.nodes);
         //logs the detailed info about each user
         for (User user : getUserMap().values()) {
             LOG.info(user.getDetailedInfo());
@@ -104,10 +82,10 @@ public class ResourceAwareScheduler implements IScheduler {
                     break;
                 }
             }
-            TopologyDetails td = null;
+            TopologyDetails td;
             try {
                 //need to re prepare since scheduling state might have been restored
-                schedulingPrioritystrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+                schedulingPrioritystrategy.prepare(this.schedulingState);
                 //Call scheduling priority strategy
                 td = schedulingPrioritystrategy.getNextTopologyToSchedule();
             } catch (Exception ex) {
@@ -120,15 +98,27 @@ public class ResourceAwareScheduler implements IScheduler {
             }
             scheduleTopology(td);
 
-            LOG.debug("Nodes after scheduling:\n{}", this.nodes);
+            LOG.debug("Nodes after scheduling:\n{}", this.schedulingState.nodes);
         }
+
+        //update changes to cluster
+        updateChanges(cluster, topologies);
+    }
+
+    private void updateChanges(Cluster cluster, Topologies topologies) {
+        //Cannot simply set this.cluster=schedulingState.cluster since clojure is immutable
+        cluster.setAssignments(schedulingState.cluster.getAssignments());
+        cluster.setBlacklistedHosts(schedulingState.cluster.getBlacklistedHosts());
+        cluster.setStatusMap(schedulingState.cluster.getStatusMap());
+        cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap());
+        cluster.setTopologyResourcesMap(schedulingState.cluster.getTopologyResourcesMap());
         //updating resources used by supervisor
-        updateSupervisorsResources(this.cluster, this.topologies);
+        updateSupervisorsResources(cluster, topologies);
     }
 
     public void scheduleTopology(TopologyDetails td) {
-        User topologySubmitter = this.userMap.get(td.getTopologySubmitter());
-        if (cluster.getUnassignedExecutors(td).size() > 0) {
+        User topologySubmitter = this.schedulingState.userMap.get(td.getTopologySubmitter());
+        if (this.schedulingState.cluster.getUnassignedExecutors(td).size() > 0) {
             LOG.debug("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
 
             SchedulingState schedulingState = checkpointSchedulingState();
@@ -140,7 +130,7 @@ public class ResourceAwareScheduler implements IScheduler {
                         td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
                 topologySubmitter = cleanup(schedulingState, td);
                 topologySubmitter.moveTopoFromPendingToInvalid(td);
-                this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
+                this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
                         + td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for details");
                 return;
             }
@@ -148,15 +138,17 @@ public class ResourceAwareScheduler implements IScheduler {
             while (true) {
                 SchedulingResult result = null;
                 try {
-                    //Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
-                    rasStrategy.prepare(new ClusterStateData(this.cluster, this.topologies));
+                    // Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
+                    // Pass in a copy of scheduling state since the scheduling strategy should not be able to be able to make modifications to
+                    // the state of cluster directly
+                    rasStrategy.prepare(new SchedulingState(this.schedulingState));
                     result = rasStrategy.schedule(td);
                 } catch (Exception ex) {
                     LOG.error(String.format("Exception thrown when running strategy %s to schedule topology %s. Topology will not be scheduled!"
                             , rasStrategy.getClass().getName(), td.getName()), ex);
                     topologySubmitter = cleanup(schedulingState, td);
                     topologySubmitter.moveTopoFromPendingToInvalid(td);
-                    this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
+                    this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
                             + rasStrategy.getClass().getName() + ". Please check logs for details");
                 }
                 LOG.debug("scheduling result: {}", result);
@@ -165,17 +157,17 @@ public class ResourceAwareScheduler implements IScheduler {
                         try {
                             if (mkAssignment(td, result.getSchedulingResultMap())) {
                                 topologySubmitter.moveTopoFromPendingToRunning(td);
-                                this.cluster.setStatus(td.getId(), "Running - " + result.getMessage());
+                                this.schedulingState.cluster.setStatus(td.getId(), "Running - " + result.getMessage());
                             } else {
                                 topologySubmitter = this.cleanup(schedulingState, td);
                                 topologySubmitter.moveTopoFromPendingToAttempted(td);
-                                this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Unable to assign executors to nodes. Please check logs for details");
+                                this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Unable to assign executors to nodes. Please check logs for details");
                             }
                         } catch (IllegalStateException ex) {
                             LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes.", ex);
                             topologySubmitter = cleanup(schedulingState, td);
                             topologySubmitter.moveTopoFromPendingToAttempted(td);
-                            this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
+                            this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
                         }
                         break;
                     } else {
@@ -193,7 +185,7 @@ public class ResourceAwareScheduler implements IScheduler {
                             boolean madeSpace = false;
                             try {
                                 //need to re prepare since scheduling state might have been restored
-                                evictionStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+                                evictionStrategy.prepare(this.schedulingState);
                                 madeSpace = evictionStrategy.makeSpaceForTopo(td);
                             } catch (Exception ex) {
                                 LOG.error(String.format("Exception thrown when running eviction strategy %s to schedule topology %s. No evictions will be done! Error: %s"
@@ -206,32 +198,32 @@ public class ResourceAwareScheduler implements IScheduler {
                                 LOG.debug("Could not make space for topo {} will move to attempted", td);
                                 topologySubmitter = cleanup(schedulingState, td);
                                 topologySubmitter.moveTopoFromPendingToAttempted(td);
-                                this.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage());
+                                this.schedulingState.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage());
                                 break;
                             }
                             continue;
                         } else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
                             topologySubmitter = cleanup(schedulingState, td);
-                            topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
+                            topologySubmitter.moveTopoFromPendingToInvalid(td, this.schedulingState.cluster);
                             break;
                         } else {
                             topologySubmitter = cleanup(schedulingState, td);
-                            topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
+                            topologySubmitter.moveTopoFromPendingToAttempted(td, this.schedulingState.cluster);
                             break;
                         }
                     }
                 } else {
                     LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", td.getName());
                     topologySubmitter = cleanup(schedulingState, td);
-                    topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
+                    topologySubmitter.moveTopoFromPendingToInvalid(td, this.schedulingState.cluster);
                     break;
                 }
             }
         } else {
             LOG.warn("Topology {} is already fully scheduled!", td.getName());
             topologySubmitter.moveTopoFromPendingToRunning(td);
-            if (this.cluster.getStatusMap().get(td.getId()) == null || this.cluster.getStatusMap().get(td.getId()).equals("")) {
-                this.cluster.setStatus(td.getId(), "Fully Scheduled");
+            if (this.schedulingState.cluster.getStatusMap().get(td.getId()) == null || this.schedulingState.cluster.getStatusMap().get(td.getId()).equals("")) {
+                this.schedulingState.cluster.setStatus(td.getId(), "Fully Scheduled");
             }
         }
     }
@@ -239,7 +231,7 @@ public class ResourceAwareScheduler implements IScheduler {
     private User cleanup(SchedulingState schedulingState, TopologyDetails td) {
         restoreCheckpointSchedulingState(schedulingState);
         //since state is restored need the update User topologySubmitter to the new User object in userMap
-        return this.userMap.get(td.getTopologySubmitter());
+        return this.schedulingState.userMap.get(td.getTopologySubmitter());
     }
 
     private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) {
@@ -255,7 +247,7 @@ public class ResourceAwareScheduler implements IScheduler {
             for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
                 WorkerSlot targetSlot = workerToTasksEntry.getKey();
                 Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
-                RAS_Node targetNode = this.nodes.getNodeById(targetSlot.getNodeId());
+                RAS_Node targetNode = this.schedulingState.nodes.getNodeById(targetSlot.getNodeId());
 
                 targetSlot = allocateResourceToSlot(td, execsNeedScheduling, targetSlot);
 
@@ -282,7 +274,7 @@ public class ResourceAwareScheduler implements IScheduler {
                     td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
                     assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
             //updating resources used for a topology
-            this.cluster.setTopologyResources(td.getId(), resources);
+            this.schedulingState.cluster.setTopologyResources(td.getId(), resources);
             return true;
         } else {
             LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", td.getName());
@@ -327,11 +319,11 @@ public class ResourceAwareScheduler implements IScheduler {
     }
 
     public User getUser(String user) {
-        return this.userMap.get(user);
+        return this.schedulingState.userMap.get(user);
     }
 
     public Map<String, User> getUserMap() {
-        return this.userMap;
+        return this.schedulingState.userMap;
     }
 
     /**
@@ -340,8 +332,8 @@ public class ResourceAwareScheduler implements IScheduler {
      * @param topologies
      * @param cluster
      */
-    private void initUsers(Topologies topologies, Cluster cluster) {
-        this.userMap = new HashMap<String, User>();
+    private Map<String, User> getUsers(Topologies topologies, Cluster cluster) {
+        Map<String, User> userMap = new HashMap<String, User>();
         Map<String, Map<String, Double>> userResourcePools = getUserResourcePools();
         LOG.debug("userResourcePools: {}", userResourcePools);
 
@@ -353,27 +345,26 @@ public class ResourceAwareScheduler implements IScheduler {
                 LOG.error("Cannot determine user for topology {}.  Will skip scheduling this topology", td.getName());
                 continue;
             }
-            if (!this.userMap.containsKey(topologySubmitter)) {
-                this.userMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
+            if (!userMap.containsKey(topologySubmitter)) {
+                userMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
             }
             if (cluster.getUnassignedExecutors(td).size() > 0) {
                 LOG.debug("adding td: {} to pending queue", td.getName());
-                this.userMap.get(topologySubmitter).addTopologyToPendingQueue(td);
+                userMap.get(topologySubmitter).addTopologyToPendingQueue(td);
             } else {
                 LOG.debug("adding td: {} to running queue with existing status: {}", td.getName(), cluster.getStatusMap().get(td.getId()));
-                this.userMap.get(topologySubmitter).addTopologyToRunningQueue(td);
+                userMap.get(topologySubmitter).addTopologyToRunningQueue(td);
                 if (cluster.getStatusMap().get(td.getId()) == null || cluster.getStatusMap().get(td.getId()).equals("")) {
                     cluster.setStatus(td.getId(), "Fully Scheduled");
                 }
             }
         }
+        return userMap;
     }
 
     private void initialize(Topologies topologies, Cluster cluster) {
-        this.cluster = cluster;
-        this.topologies = topologies;
-        this.nodes = new RAS_Nodes(this.cluster, this.topologies);
-        initUsers(topologies, cluster);
+        Map<String, User> userMap = getUsers(topologies, cluster);
+        this.schedulingState = new SchedulingState(userMap, cluster, topologies, this.conf);
     }
 
     /**
@@ -412,35 +403,24 @@ public class ResourceAwareScheduler implements IScheduler {
 
     private SchedulingState checkpointSchedulingState() {
         LOG.debug("/*********Checkpoint scheduling state************/");
-        for (User user : getUserMap().values()) {
+        for (User user : this.schedulingState.userMap.values()) {
             LOG.debug(user.getDetailedInfo());
         }
-        LOG.debug(ResourceUtils.printScheduling(this.cluster, this.topologies));
-        LOG.debug("nodes:\n{}", this.nodes);
+        LOG.debug(ResourceUtils.printScheduling(this.schedulingState.cluster, this.schedulingState.topologies));
+        LOG.debug("nodes:\n{}", this.schedulingState.nodes);
         LOG.debug("/*********End************/");
-        return new SchedulingState(this.userMap, this.cluster, this.topologies, this.nodes, this.conf);
+        return new SchedulingState(this.schedulingState);
     }
 
     private void restoreCheckpointSchedulingState(SchedulingState schedulingState) {
         LOG.debug("/*********restoring scheduling state************/");
         //reseting cluster
-        //Cannot simply set this.cluster=schedulingState.cluster since clojure is immutable
-        this.cluster.setAssignments(schedulingState.cluster.getAssignments());
-        this.cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap());
-        this.cluster.setStatusMap(schedulingState.cluster.getStatusMap());
-        this.cluster.setTopologyResourcesMap(schedulingState.cluster.getTopologyResourcesMap());
-        //don't need to explicitly set data structues like Cluster since nothing can really be changed
-        //unless this.topologies is set to another object
-        this.topologies = schedulingState.topologies;
-        this.conf = schedulingState.conf;
-        this.userMap = schedulingState.userMap;
-        this.nodes = schedulingState.nodes;
-
-        for (User user : getUserMap().values()) {
+        this.schedulingState = schedulingState;
+        for (User user : this.schedulingState.userMap.values()) {
             LOG.debug(user.getDetailedInfo());
         }
-        LOG.debug(ResourceUtils.printScheduling(cluster, topologies));
-        LOG.debug("nodes:\n{}", this.nodes);
+        LOG.debug(ResourceUtils.printScheduling(this.schedulingState.cluster, this.schedulingState.topologies));
+        LOG.debug("nodes:\n{}", this.schedulingState.nodes);
         LOG.debug("/*********End************/");
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/SchedulingState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/SchedulingState.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/SchedulingState.java
new file mode 100644
index 0000000..8a28ac0
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/SchedulingState.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Topologies;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Class that holds the completely scheduling state of Resource Aware Scheduler
+ */
+public class SchedulingState {
+    public final Map<String, User> userMap = new HashMap<String, User>();
+    public final Cluster cluster;
+    public final Topologies topologies;
+    public final RAS_Nodes nodes;
+    public final Map conf = new Config();
+
+    public SchedulingState(Map<String, User> userMap, Cluster cluster, Topologies topologies, Map conf) {
+        for (Map.Entry<String, User> userMapEntry : userMap.entrySet()) {
+            String userId = userMapEntry.getKey();
+            User user = userMapEntry.getValue();
+            this.userMap.put(userId, new User(user));
+        }
+        this.cluster = new Cluster(cluster);
+        this.topologies = new Topologies(topologies);
+        this.nodes = new RAS_Nodes(this.cluster, this.topologies);
+        this.conf.putAll(conf);
+    }
+
+    /**
+     * copy constructor
+     */
+    public SchedulingState(SchedulingState src) {
+        this(src.userMap, src.cluster, src.topologies, src.conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/User.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/User.java
index 9d450ab..0f5a563 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/User.java
@@ -65,21 +65,23 @@ public class User {
         }
     }
 
-    public User getCopy() {
-        User newUser = new User(this.userId, this.resourcePool);
-        for (TopologyDetails topo : this.pendingQueue) {
-            newUser.addTopologyToPendingQueue(topo);
+    /**
+     * Copy Constructor
+     */
+    public User(User src) {
+        this(src.userId, src.resourcePool);
+        for (TopologyDetails topo : src.pendingQueue) {
+            addTopologyToPendingQueue(topo);
         }
-        for (TopologyDetails topo : this.runningQueue) {
-            newUser.addTopologyToRunningQueue(topo);
+        for (TopologyDetails topo : src.runningQueue) {
+            addTopologyToRunningQueue(topo);
         }
-        for (TopologyDetails topo : this.attemptedQueue) {
-            newUser.addTopologyToAttemptedQueue(topo);
+        for (TopologyDetails topo : src.attemptedQueue) {
+            addTopologyToAttemptedQueue(topo);
         }
-        for (TopologyDetails topo : this.invalidQueue) {
-            newUser.addTopologyToInvalidQueue(topo);
+        for (TopologyDetails topo : src.invalidQueue) {
+            addTopologyToInvalidQueue(topo);
         }
-        return newUser;
     }
 
     public String getId() {

http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
index 91f0058..182017b 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -19,10 +19,10 @@
 package org.apache.storm.scheduler.resource.strategies.eviction;
 
 import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.SchedulingState;
 import org.apache.storm.scheduler.resource.User;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,10 +39,10 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
     private RAS_Nodes nodes;
 
     @Override
-    public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
-        this.cluster = cluster;
-        this.userMap = userMap;
-        this.nodes = nodes;
+    public void prepare(SchedulingState schedulingState) {
+        this.cluster = schedulingState.cluster;
+        this.userMap = schedulingState.userMap;
+        this.nodes = schedulingState.nodes;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
index e8ba3a9..9499424 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
@@ -18,20 +18,15 @@
 
 package org.apache.storm.scheduler.resource.strategies.eviction;
 
-import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.RAS_Nodes;
-import org.apache.storm.scheduler.resource.User;
-
-import java.util.Map;
+import org.apache.storm.scheduler.resource.SchedulingState;
 
 public interface IEvictionStrategy {
 
     /**
      * Initialization
      */
-    public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
+    public void prepare(SchedulingState schedulingState);
 
     /**
      * This method when invoked should attempt to make space on the cluster so that the topology specified can be scheduled

http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
index 57ef3ca..e3109d5 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -19,9 +19,8 @@
 package org.apache.storm.scheduler.resource.strategies.priority;
 
 import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.SchedulingState;
 import org.apache.storm.scheduler.resource.User;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,9 +35,9 @@ public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStr
     private Map<String, User> userMap;
 
     @Override
-    public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
-        this.cluster = cluster;
-        this.userMap = userMap;
+    public void prepare(SchedulingState schedulingState) {
+        this.cluster = schedulingState.cluster;
+        this.userMap = schedulingState.userMap;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
index 63ab919..ffb463f 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
@@ -18,20 +18,15 @@
 
 package org.apache.storm.scheduler.resource.strategies.priority;
 
-import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.RAS_Nodes;
-import org.apache.storm.scheduler.resource.User;
-
-import java.util.Map;
+import org.apache.storm.scheduler.resource.SchedulingState;
 
 public interface ISchedulingPriorityStrategy {
 
     /**
      * initializes
      */
-    public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
+    public void prepare(SchedulingState schedulingState);
 
     /**
      * Gets the next topology to schedule

http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 9ecba47..9a12f80 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -30,9 +30,12 @@ import java.util.TreeMap;
 import java.util.HashSet;
 import java.util.Iterator;
 
-import org.apache.storm.scheduler.resource.ClusterStateData.NodeDetails;
-import org.apache.storm.scheduler.resource.ClusterStateData;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.resource.RAS_Node;
+import org.apache.storm.scheduler.resource.RAS_Nodes;
 import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingState;
 import org.apache.storm.scheduler.resource.SchedulingStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,25 +47,21 @@ import org.apache.storm.scheduler.resource.Component;
 
 public class DefaultResourceAwareStrategy implements IStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
-    private ClusterStateData _clusterStateData;
-    //Map key is the supervisor id and the value is the corresponding RAS_Node Object
-    private Map<String, NodeDetails> _availNodes;
-    private NodeDetails refNode = null;
-    /**
-     * supervisor id -> Node
-     */
-    private Map<String, NodeDetails> _nodes;
+    private Cluster _cluster;
+    private Topologies _topologies;
+    private RAS_Node refNode = null;
     private Map<String, List<String>> _clusterInfo;
+    private RAS_Nodes _nodes;
 
     private final double CPU_WEIGHT = 1.0;
     private final double MEM_WEIGHT = 1.0;
     private final double NETWORK_WEIGHT = 1.0;
 
-    public void prepare (ClusterStateData clusterStateData) {
-        _clusterStateData = clusterStateData;
-        _nodes = clusterStateData.nodes;
-        _availNodes = this.getAvailNodes();
-        _clusterInfo = _clusterStateData.getNetworkTopography();
+    public void prepare (SchedulingState schedulingState) {
+        _cluster = schedulingState.cluster;
+        _topologies = schedulingState.topologies;
+        _nodes = schedulingState.nodes;
+        _clusterInfo = schedulingState.cluster.getNetworkTopography();
         LOG.debug(this.getClusterInfo());
     }
 
@@ -84,11 +83,11 @@ public class DefaultResourceAwareStrategy implements IStrategy {
     }
 
     public SchedulingResult schedule(TopologyDetails td) {
-        if (_availNodes.size() <= 0) {
+        if (_nodes.getNodes().size() <= 0) {
             LOG.warn("No available nodes to schedule tasks on!");
             return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
         }
-        Collection<ExecutorDetails> unassignedExecutors = _clusterStateData.getUnassignedExecutors(td.getId());
+        Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
         Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>();
         LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
         Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
@@ -149,7 +148,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             Collection<ExecutorDetails>> schedulerAssignmentMap, Collection<ExecutorDetails> scheduledTasks) {
         WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
         if (targetSlot != null) {
-            NodeDetails targetNode = this.idToNode(targetSlot.getNodeId());
+            RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
             if (!schedulerAssignmentMap.containsKey(targetSlot)) {
                 schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
             }
@@ -189,7 +188,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
     private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
         double taskMem = td.getTotalMemReqTask(exec);
         double taskCPU = td.getTotalCpuReqTask(exec);
-        List<NodeDetails> nodes;
+        List<RAS_Node> nodes;
         if(clusterId != null) {
             nodes = this.getAvailableNodesFromCluster(clusterId);
             
@@ -197,8 +196,8 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             nodes = this.getAvailableNodes();
         }
         //First sort nodes by distance
-        TreeMap<Double, NodeDetails> nodeRankMap = new TreeMap<>();
-        for (NodeDetails n : nodes) {
+        TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<>();
+        for (RAS_Node n : nodes) {
             if(n.getFreeSlots().size()>0) {
                 if (n.getAvailableMemoryResources() >= taskMem
                         && n.getAvailableCpuResources() >= taskCPU) {
@@ -217,8 +216,8 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             }
         }
         //Then, pick worker from closest node that satisfy constraints
-        for(Map.Entry<Double, NodeDetails> entry : nodeRankMap.entrySet()) {
-            NodeDetails n = entry.getValue();
+        for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) {
+            RAS_Node n = entry.getValue();
             for(WorkerSlot ws : n.getFreeSlots()) {
                 if(checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) {
                     return ws;
@@ -245,15 +244,15 @@ public class DefaultResourceAwareStrategy implements IStrategy {
     private Double getTotalClusterRes(List<String> cluster) {
         Double res = 0.0;
         for (String node : cluster) {
-            res += _availNodes.get(this.NodeHostnameToId(node))
+            res += _nodes.getNodeById(this.NodeHostnameToId(node))
                     .getAvailableMemoryResources()
-                    + _availNodes.get(this.NodeHostnameToId(node))
+                    + _nodes.getNodeById(this.NodeHostnameToId(node))
                     .getAvailableCpuResources();
         }
         return res;
     }
 
-    private Double distToNode(NodeDetails src, NodeDetails dest) {
+    private Double distToNode(RAS_Node src, RAS_Node dest) {
         if (src.getId().equals(dest.getId())) {
             return 0.0;
         } else if (this.NodeToCluster(src).equals(this.NodeToCluster(dest))) {
@@ -263,7 +262,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         }
     }
 
-    private String NodeToCluster(NodeDetails node) {
+    private String NodeToCluster(RAS_Node node) {
         for (Entry<String, List<String>> entry : _clusterInfo
                 .entrySet()) {
             if (entry.getValue().contains(node.getHostname())) {
@@ -274,27 +273,27 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         return null;
     }
     
-    private List<NodeDetails> getAvailableNodes() {
-        LinkedList<NodeDetails> nodes = new LinkedList<>();
+    private List<RAS_Node> getAvailableNodes() {
+        LinkedList<RAS_Node> nodes = new LinkedList<>();
         for (String clusterId : _clusterInfo.keySet()) {
             nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
         }
         return nodes;
     }
 
-    private List<NodeDetails> getAvailableNodesFromCluster(String clus) {
-        List<NodeDetails> retList = new ArrayList<>();
+    private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
+        List<RAS_Node> retList = new ArrayList<>();
         for (String node_id : _clusterInfo.get(clus)) {
-            retList.add(_availNodes.get(this
+            retList.add(_nodes.getNodeById(this
                     .NodeHostnameToId(node_id)));
         }
         return retList;
     }
 
     private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
-        List<NodeDetails> nodes = this.getAvailableNodesFromCluster(clusterId);
+        List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
         List<WorkerSlot> workers = new LinkedList<>();
-        for(NodeDetails node : nodes) {
+        for(RAS_Node node : nodes) {
             workers.addAll(node.getFreeSlots());
         }
         return workers;
@@ -309,13 +308,6 @@ public class DefaultResourceAwareStrategy implements IStrategy {
     }
 
     /**
-     * In case in the future RAS can only use a subset of nodes
-     */
-    private Map<String, NodeDetails> getAvailNodes() {
-        return _nodes;
-    }
-
-    /**
      * Breadth first traversal of the topology DAG
      * @param td
      * @param spouts
@@ -429,7 +421,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             String clusterId = clusterEntry.getKey();
             retVal += "Rack: " + clusterId + "\n";
             for(String nodeHostname : clusterEntry.getValue()) {
-                NodeDetails node = this.idToNode(this.NodeHostnameToId(nodeHostname));
+                RAS_Node node = this.idToNode(this.NodeHostnameToId(nodeHostname));
                 retVal += "-> Node: " + node.getHostname() + " " + node.getId() + "\n";
                 retVal += "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + "}\n";
                 retVal += "--> Total Resources: {Mem " + node.getTotalMemoryResources() + ", CPU " + node.getTotalCpuResources() + "}\n";
@@ -444,7 +436,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
      * @return the id of a node
      */
     public String NodeHostnameToId(String hostname) {
-        for (NodeDetails n : _nodes.values()) {
+        for (RAS_Node n : _nodes.getNodes()) {
             if (n.getHostname() == null) {
                 continue;
             }
@@ -461,11 +453,11 @@ public class DefaultResourceAwareStrategy implements IStrategy {
      * @param id
      * @return a RAS_Node object
      */
-    public NodeDetails idToNode(String id) {
-        if(_nodes.containsKey(id) == false) {
+    public RAS_Node idToNode(String id) {
+        RAS_Node ret = _nodes.getNodeById(id);
+        if(ret == null) {
             LOG.error("Cannot find Node with Id: {}", id);
-            return null;
         }
-        return _nodes.get(id);
+        return ret;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
index 4a1180a..b3b6305 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -19,8 +19,8 @@
 package org.apache.storm.scheduler.resource.strategies.scheduling;
 
 import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.ClusterStateData;
 import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingState;
 
 /**
  * An interface to for implementing different scheduling strategies for the resource aware scheduling
@@ -31,7 +31,7 @@ public interface IStrategy {
     /**
      * initialize prior to scheduling
      */
-    void prepare(ClusterStateData clusterStateData);
+    void prepare(SchedulingState schedulingState);
 
     /**
      * This method is invoked to calcuate a scheduling for topology td
@@ -40,6 +40,8 @@ public interface IStrategy {
      * The strategy must calculate a scheduling in the format of Map<WorkerSlot, Collection<ExecutorDetails>> where the key of
      * this map is the worker slot that the value (collection of executors) should be assigned to.
      * if a scheduling is calculated successfully, put the scheduling map in the SchedulingResult object.
+     * PLEASE NOTE: Any other operations done on the cluster from a scheduling strategy will NOT persist or be realized.
+     * The data structures passed in can be used in any way necessary to assist in calculating a scheduling, but will NOT actually change the state of the cluster.
      */
     SchedulingResult schedule(TopologyDetails td);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index c4c1b3b..d1c261b 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -1385,4 +1385,49 @@ public class TestResourceAwareScheduler {
             }
         }
     }
+
+    /**
+     * When the first topology failed to be scheduled make sure subsequent schedulings can still succeed
+     */
+    @Test
+    public void TestSchedulingAfterFailedScheduling() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(8, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, currentTime - 2, 10);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, currentTime - 2, 20);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, currentTime - 2, 20);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo2.getId(), topo2);
+        topoMap.put(topo3.getId(), topo3);
+
+        Topologies topologies = new Topologies(topoMap);
+        
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        Assert.assertTrue("Topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
+        Assert.assertEquals("Topo-2 all executors scheduled?", 4, cluster.getAssignmentById(topo2.getId()).getExecutorToSlot().size());
+        Assert.assertTrue("Topo-3 scheduled?", cluster.getAssignmentById(topo3.getId()) != null);
+        Assert.assertEquals("Topo-3 all executors scheduled?", 3, cluster.getAssignmentById(topo3.getId()).getExecutorToSlot().size());
+    }
 }


[2/2] storm git commit: Added STORM-1634 to Changelog

Posted by bo...@apache.org.
Added STORM-1634 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/20ba5189
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/20ba5189
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/20ba5189

Branch: refs/heads/1.x-branch
Commit: 20ba5189fdf8cc5894f163a21c194501de69542d
Parents: bf99ae5
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Mar 29 11:11:28 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Mar 29 11:12:25 2016 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/20ba5189/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3c36a3b..8fb0902 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.0.0
+ * STORM-1634: Refactoring of Resource Aware Scheduler
  * STORM-1030: Hive Connector Fixes
  * STORM-676: Storm Trident support for sliding/tumbling windows
  * STORM-1630: Add guide page for Windows users