You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by je...@apache.org on 2016/01/19 17:52:55 UTC

[1/5] storm git commit: [STORM-1450] - Fix bugs and refactor code in ResourceAwareScheduler

Repository: storm
Updated Branches:
  refs/heads/master c9d687e7f -> 6e1516256


[STORM-1450] - Fix bugs and refactor code in ResourceAwareScheduler


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/829ea117
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/829ea117
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/829ea117

Branch: refs/heads/master
Commit: 829ea117c8b382ef72d3c5df841534b89f06dffa
Parents: 2289d36
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Wed Jan 13 12:28:38 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Jan 14 14:24:42 2016 -0600

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |   6 +-
 .../storm/scheduler/SchedulerAssignment.java    |   3 +
 .../scheduler/SchedulerAssignmentImpl.java      |  14 +
 .../org/apache/storm/scheduler/WorkerSlot.java  |  24 +-
 .../scheduler/resource/ClusterStateData.java    | 101 +++++++
 .../storm/scheduler/resource/RAS_Node.java      | 292 ++++++++++---------
 .../storm/scheduler/resource/RAS_Nodes.java     |  88 ++++--
 .../resource/ResourceAwareScheduler.java        |   7 +-
 .../DefaultResourceAwareStrategy.java           |  82 +++---
 .../strategies/scheduling/IStrategy.java        |   9 +-
 10 files changed, 390 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/829ea117/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 3f1d4e5..99260d9 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -701,11 +701,7 @@
                        ;; making a map from node+port to WorkerSlot with allocated resources
                        node+port->slot (into {} (for [[[node port] [mem-on-heap mem-off-heap cpu]] worker->resources]
                                                   {[node port]
-                                                   (doto (WorkerSlot. node port)
-                                                     (.allocateResource
-                                                       mem-on-heap
-                                                       mem-off-heap
-                                                       cpu))}))
+                                                   (doto (WorkerSlot. node port mem-on-heap mem-off-heap cpu))}))
                        executor->slot (into {} (for [[executor [node port]] executor->node+port]
                                                  ;; filter out the dead executors
                                                  (if (contains? alive-executors executor)

http://git-wip-us.apache.org/repos/asf/storm/blob/829ea117/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java b/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java
index 1c8f18e..e1b6605 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.scheduler;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
@@ -55,4 +56,6 @@ public interface SchedulerAssignment {
     public Set<ExecutorDetails> getExecutors();
     
     public Set<WorkerSlot> getSlots();
+
+    public Map<WorkerSlot, Collection<ExecutorDetails>> getSlotToExecutors();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/829ea117/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java b/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
index 35fd3d7..dffeb47 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -102,4 +103,17 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
     public Set<ExecutorDetails> getExecutors() {
         return this.executorToSlot.keySet();
     }
+
+    public Map<WorkerSlot, Collection<ExecutorDetails>> getSlotToExecutors() {
+        Map<WorkerSlot, Collection<ExecutorDetails>> ret = new HashMap<WorkerSlot, Collection<ExecutorDetails>>();
+        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : executorToSlot.entrySet()) {
+            ExecutorDetails exec = entry.getKey();
+            WorkerSlot ws = entry.getValue();
+            if (!ret.containsKey(ws)) {
+                ret.put(ws, new LinkedList<ExecutorDetails>());
+            }
+            ret.get(ws).add(exec);
+        }
+        return ret;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/829ea117/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java b/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
index f569a50..57cd9c5 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
@@ -18,18 +18,25 @@
 package org.apache.storm.scheduler;
 
 public class WorkerSlot {
-    String nodeId;
-    int port;
+    protected String nodeId;
+    protected int port;
     // amount of on-heap memory allocated to it
-    double memOnHeap = 0.0;
+    protected double memOnHeap = 0.0;
     // amount of off-heap memory allocated to it
-    double memOffHeap = 0.0;
+    protected double memOffHeap = 0.0;
     // amount of cpu allocated to it
-    double cpu = 0.0;
+    protected double cpu = 0.0;
     
     public WorkerSlot(String nodeId, Number port) {
+        this(nodeId, port, 0.0, 0.0, 0.0);
+    }
+
+    public WorkerSlot(String nodeId, Number port, double memOnHeap, double memOffHeap, double cpu) {
         this.nodeId = nodeId;
         this.port = port.intValue();
+        this.memOnHeap = memOnHeap;
+        this.memOffHeap = memOffHeap;
+        this.cpu = cpu;
     }
     
     public String getNodeId() {
@@ -40,11 +47,8 @@ public class WorkerSlot {
         return port;
     }
 
-    public WorkerSlot allocateResource(double memOnHeap, double memOffHeap, double cpu) {
-        this.memOnHeap += memOnHeap;
-        this.memOffHeap += memOffHeap;
-        this.cpu += cpu;
-        return this;
+    public String getId() {
+        return this.getNodeId() + ":" + this.getPort();
     }
 
     public double getAllocatedMemOnHeap() {

http://git-wip-us.apache.org/repos/asf/storm/blob/829ea117/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
new file mode 100644
index 0000000..0fffe93
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *  A class to specify which data and API to expose to a scheduling strategy
+ */
+public class ClusterStateData {
+
+    private final Cluster cluster;
+
+    public final Topologies topologies;
+
+    // Information regarding all nodes in the cluster
+    public Map<String, NodeDetails> nodes = new HashMap<String, NodeDetails>();
+
+    public static final class NodeDetails {
+
+        private final RAS_Node node;
+
+        public NodeDetails(RAS_Node node) {
+            this.node = node;
+        }
+
+        public String getId() {
+            return this.node.getId();
+        }
+
+        public String getHostname() {
+            return this.node.getHostname();
+        }
+
+        public Collection<WorkerSlot> getFreeSlots() {
+            return this.node.getFreeSlots();
+        }
+
+        public void consumeResourcesforTask(ExecutorDetails exec, TopologyDetails topo) {
+            this.node.consumeResourcesforTask(exec, topo);
+        }
+
+        public Double getAvailableMemoryResources() {
+            return this.node.getAvailableMemoryResources();
+        }
+
+        public Double getAvailableCpuResources() {
+            return this.node.getAvailableCpuResources();
+        }
+
+        public Double getTotalMemoryResources() {
+            return this.node.getTotalMemoryResources();
+        }
+
+        public Double getTotalCpuResources() {
+            return this.node.getTotalCpuResources();
+        }
+    }
+
+    public ClusterStateData(Cluster cluster, Topologies topologies) {
+        this.cluster = cluster;
+        this.topologies = topologies;
+        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
+        for(Map.Entry<String, RAS_Node> entry : nodes.entrySet()) {
+            this.nodes.put(entry.getKey(), new NodeDetails(entry.getValue()));
+        }
+    }
+
+    public Collection<ExecutorDetails> getUnassignedExecutors(String topoId) {
+        return this.cluster.getUnassignedExecutors(this.topologies.getById(topoId));
+    }
+
+    public Map<String, List<String>> getNetworkTopography() {
+        return this.cluster.getNetworkTopography();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/829ea117/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
index 4e92c3f..3e10085 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -43,32 +43,53 @@ import org.apache.storm.scheduler.WorkerSlot;
  */
 public class RAS_Node {
     private static final Logger LOG = LoggerFactory.getLogger(RAS_Node.class);
-    private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
-    private Set<WorkerSlot> _freeSlots = new HashSet<WorkerSlot>();
+
+    //A map consisting of all workers on the node.
+    //The key of the map is the worker id and the value is the corresponding workerslot object
+    Map<String, WorkerSlot> _slots = new HashMap<String, WorkerSlot> ();
+
+    // A map describing which topologies are using which slots on this node.  The format of the map is the following:
+    // {TopologyId -> {WorkerId -> {Executors}}}
+    private Map<String, Map<String, Collection<ExecutorDetails>>> _topIdToUsedSlots = new HashMap<String, Map<String, Collection<ExecutorDetails>>>();
+
     private final String _nodeId;
     private String _hostname;
     private boolean _isAlive;
     private SupervisorDetails _sup;
-    private Double _availMemory;
-    private Double _availCPU;
-    private Cluster _cluster;
-    private Topologies _topologies;
+    private Double _availMemory = 0.0;
+    private Double _availCPU = 0.0;
+    private final Cluster _cluster;
+    private final Topologies _topologies;
 
-    public RAS_Node(String nodeId, Set<Integer> allPorts, boolean isAlive,
-                    SupervisorDetails sup, Cluster cluster, Topologies topologies) {
+    public RAS_Node(String nodeId, SupervisorDetails sup, Cluster cluster, Topologies topologies, Map<String, WorkerSlot> workerIdToWorker, Map<String, Map<String, Collection<ExecutorDetails>>> assignmentMap) {
+        //Node ID and supervisor ID are the same.
         _nodeId = nodeId;
-        _isAlive = isAlive;
-        if (_isAlive && allPorts != null) {
-            for (int port : allPorts) {
-                _freeSlots.add(new WorkerSlot(_nodeId, port));
-            }
-            _sup = sup;
+        _isAlive = !cluster.isBlackListed(_nodeId);
+
+        //check if node is alive
+        if (_isAlive && sup != null) {
             _hostname = sup.getHost();
+            _sup = sup;
             _availMemory = getTotalMemoryResources();
             _availCPU = getTotalCpuResources();
+
+            LOG.debug("Found a {} Node {} {}",
+                    _isAlive ? "living" : "dead", _nodeId, sup.getAllPorts());
+            LOG.debug("resources_mem: {}, resources_CPU: {}", sup.getTotalMemory(), sup.getTotalCPU());
         }
+
         _cluster = cluster;
         _topologies = topologies;
+
+        // initialize slots for this node
+        if (workerIdToWorker != null) {
+            _slots = workerIdToWorker;
+        }
+
+        //initialize assignment map
+        if (assignmentMap != null) {
+            _topIdToUsedSlots = assignmentMap;
+        }
     }
 
     public String getId() {
@@ -79,18 +100,45 @@ public class RAS_Node {
         return _hostname;
     }
 
+    private Collection<WorkerSlot> workerIdsToWorkers(Collection<String> workerIds) {
+        Collection<WorkerSlot> ret = new LinkedList<WorkerSlot>();
+        for (String workerId : workerIds) {
+            ret.add(_slots.get(workerId));
+        }
+        return ret;
+    }
+
+    public Collection<String> getFreeSlotsId() {
+        if(!_isAlive) {
+            return new HashSet<String>();
+        }
+        Collection<String> usedSlotsId = getUsedSlotsId();
+        Set<String> ret = new HashSet<>();
+        ret.addAll(_slots.keySet());
+        ret.removeAll(usedSlotsId);
+        return ret;
+    }
+
     public Collection<WorkerSlot> getFreeSlots() {
-        return _freeSlots;
+        return workerIdsToWorkers(getFreeSlotsId());
     }
 
-    public Collection<WorkerSlot> getUsedSlots() {
-        Collection<WorkerSlot> ret = new LinkedList<WorkerSlot>();
-        for (Collection<WorkerSlot> workers : _topIdToUsedSlots.values()) {
-            ret.addAll(workers);
+    public Collection<String> getUsedSlotsId() {
+        Collection<String> ret = new LinkedList<String>();
+        for (Map<String, Collection<ExecutorDetails>> entry : _topIdToUsedSlots.values()) {
+            ret.addAll(entry.keySet());
         }
         return ret;
     }
 
+    public Collection<WorkerSlot> getUsedSlots() {
+        return workerIdsToWorkers(getUsedSlotsId());
+    }
+
+    public Collection<WorkerSlot> getUsedSlots(String topId) {
+        return workerIdsToWorkers(_topIdToUsedSlots.get(topId).keySet());
+    }
+
     public boolean isAlive() {
         return _isAlive;
     }
@@ -103,92 +151,37 @@ public class RAS_Node {
     }
 
     public boolean isTotallyFree() {
-        return _topIdToUsedSlots.isEmpty();
+        return getUsedSlots().isEmpty();
     }
 
     public int totalSlotsFree() {
-        return _freeSlots.size();
+        return getFreeSlots().size();
     }
 
     public int totalSlotsUsed() {
-        int total = 0;
-        for (Set<WorkerSlot> slots : _topIdToUsedSlots.values()) {
-            total += slots.size();
-        }
-        return total;
+        return getUsedSlots().size();
     }
 
     public int totalSlots() {
-        return totalSlotsFree() + totalSlotsUsed();
+        return _slots.size();
     }
 
     public int totalSlotsUsed(String topId) {
-        int total = 0;
-        Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
-        if (slots != null) {
-            total = slots.size();
-        }
-        return total;
-    }
-
-    private void validateSlot(WorkerSlot ws) {
-        if (!_nodeId.equals(ws.getNodeId())) {
-            throw new IllegalArgumentException(
-                    "Trying to add a slot to the wrong node " + ws +
-                            " is not a part of " + _nodeId);
-        }
-    }
-
-    void addOrphanedSlot(WorkerSlot ws) {
-        if (_isAlive) {
-            throw new IllegalArgumentException("Orphaned Slots " +
-                    "only are allowed on dead nodes.");
-        }
-        validateSlot(ws);
-        if (_freeSlots.contains(ws)) {
-            return;
-        }
-        for (Set<WorkerSlot> used : _topIdToUsedSlots.values()) {
-            if (used.contains(ws)) {
-                return;
-            }
-        }
-        _freeSlots.add(ws);
-    }
-
-    boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) {
-        validateSlot(ws);
-        if (!_freeSlots.remove(ws)) {
-            if (dontThrow) {
-                return true;
-            }
-            throw new IllegalStateException("Assigning a slot that was not free " + ws);
-        }
-        Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId);
-        if (usedSlots == null) {
-            usedSlots = new HashSet<WorkerSlot>();
-            _topIdToUsedSlots.put(topId, usedSlots);
-        }
-        usedSlots.add(ws);
-        return false;
+        return getUsedSlots(topId).size();
     }
 
     /**
      * Free all slots on this node.  This will update the Cluster too.
      */
-    public void freeAllSlots() {
+     public void freeAllSlots() {
         if (!_isAlive) {
             LOG.warn("Freeing all slots on a dead node {} ", _nodeId);
         }
-        for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
-            _cluster.freeSlots(entry.getValue());
-            _availCPU = getTotalCpuResources();
-            _availMemory = getAvailableMemoryResources();
-            if (_isAlive) {
-                _freeSlots.addAll(entry.getValue());
-            }
-        }
-        _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
+        _cluster.freeSlots(_slots.values());
+        _availCPU = getTotalCpuResources();
+        _availMemory = getAvailableMemoryResources();
+        //clearing assignments
+        _topIdToUsedSlots.clear();
     }
 
     /**
@@ -197,43 +190,26 @@ public class RAS_Node {
      */
     public void free(WorkerSlot ws) {
         LOG.info("freeing WorkerSlot {} on node {}", ws, _hostname);
-        if (_freeSlots.contains(ws)) return;
-        for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
-            Set<WorkerSlot> slots = entry.getValue();
-            double memUsed = getMemoryUsedByWorker(ws);
-            double cpuUsed = getCpuUsedByWorker(ws);
-            if (slots.remove(ws)) {
-                _cluster.freeSlot(ws);
-                if (_isAlive) {
-                    _freeSlots.add(ws);
-                }
-                freeMemory(memUsed);
-                freeCPU(cpuUsed);
-                return;
-            }
+        if (!_slots.containsKey(ws.getId())) {
+            throw new IllegalArgumentException("Tried to free a slot " + ws + " that was not" +
+                    " part of this node " + _nodeId);
         }
-        throw new IllegalArgumentException("Tried to free a slot that was not" +
-                " part of this node " + _nodeId);
-    }
 
-    /**
-     * Frees all the slots for a topology.
-     * @param topId the topology to free slots for
-     */
-    public void freeTopology(String topId) {
-        Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
-        if (slots == null || slots.isEmpty()) {
-            return;
-        }
-        for (WorkerSlot ws : slots) {
-            _cluster.freeSlot(ws);
-            freeMemory(getMemoryUsedByWorker(ws));
-            freeCPU(getCpuUsedByWorker(ws));
-            if (_isAlive) {
-                _freeSlots.add(ws);
-            }
+        TopologyDetails topo = findTopologyUsingWorker(ws);
+        if (topo == null) {
+            throw new IllegalArgumentException("Tried to free a slot " + ws + " that was already free!");
         }
-        _topIdToUsedSlots.remove(topId);
+
+        //free slot
+        _cluster.freeSlot(ws);
+
+        //cleanup internal assignments
+        _topIdToUsedSlots.get(topo.getId()).remove(ws.getId());
+
+        double memUsed = getMemoryUsedByWorker(ws);
+        double cpuUsed = getCpuUsedByWorker(ws);
+        freeMemory(memUsed);
+        freeCPU(cpuUsed);
     }
 
     private void freeMemory(double amount) {
@@ -254,6 +230,9 @@ public class RAS_Node {
         _availCPU += amount;
     }
 
+    /**
+     * get the amount of memory used by a worker
+     */
     public double getMemoryUsedByWorker(WorkerSlot ws) {
         TopologyDetails topo = findTopologyUsingWorker(ws);
         if (topo == null) {
@@ -267,6 +246,9 @@ public class RAS_Node {
         return totalMemoryUsed;
     }
 
+    /**
+     * get the amount of cpu used by a worker
+     */
     public double getCpuUsedByWorker(WorkerSlot ws) {
         TopologyDetails topo = findTopologyUsingWorker(ws);
         if (topo == null) {
@@ -280,12 +262,16 @@ public class RAS_Node {
         return totalCpuUsed;
     }
 
+    /**
+     * Find a which topology is running on a worker slot
+     * @return the topology using the worker slot.  If worker slot is free then return null
+     */
     public TopologyDetails findTopologyUsingWorker(WorkerSlot ws) {
-        for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
+        for (Entry<String, Map<String, Collection<ExecutorDetails>>> entry : _topIdToUsedSlots.entrySet()) {
             String topoId = entry.getKey();
-            Set<WorkerSlot> workers = entry.getValue();
-            for (WorkerSlot worker : workers) {
-                if (worker.getNodeId().equals(ws.getNodeId()) && worker.getPort() == ws.getPort()) {
+            Set<String> workerIds = entry.getValue().keySet();
+            for (String workerId : workerIds) {
+                if(ws.getId().equals(workerId)) {
                     return _topologies.getById(topoId);
                 }
             }
@@ -299,7 +285,28 @@ public class RAS_Node {
      * @param executors the executors to run in that slot.
      * @param slot the slot to allocate resource to
      */
-    public void allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
+//    private void allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
+//        double onHeapMem = 0.0;
+//        double offHeapMem = 0.0;
+//        double cpu = 0.0;
+//        for (ExecutorDetails exec : executors) {
+//            Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
+//            if (onHeapMemForExec != null) {
+//                onHeapMem += onHeapMemForExec;
+//            }
+//            Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
+//            if (offHeapMemForExec != null) {
+//                offHeapMem += offHeapMemForExec;
+//            }
+//            Double cpuForExec = td.getTotalCpuReqTask(exec);
+//            if (cpuForExec != null) {
+//                cpu += cpuForExec;
+//            }
+//        }
+//        slot.allocateResource(onHeapMem, offHeapMem, cpu);
+//    }
+
+    private WorkerSlot allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
         double onHeapMem = 0.0;
         double offHeapMem = 0.0;
         double cpu = 0.0;
@@ -317,35 +324,48 @@ public class RAS_Node {
                 cpu += cpuForExec;
             }
         }
-        slot.allocateResource(onHeapMem, offHeapMem, cpu);
+        return new WorkerSlot(slot.getNodeId(), slot.getPort(), onHeapMem, offHeapMem, cpu);
     }
 
+    /**
+     * Assigns a worker to a node
+     * @param target the worker slot to assign the executors
+     * @param td the topology the executors are from
+     * @param executors executors to assign to the specified worker slot
+     */
     public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors) {
         if (!_isAlive) {
             throw new IllegalStateException("Trying to adding to a dead node " + _nodeId);
         }
-        if (_freeSlots.isEmpty()) {
+        Collection<WorkerSlot> freeSlots = getFreeSlots();
+        if (freeSlots.isEmpty()) {
             throw new IllegalStateException("Trying to assign to a full node " + _nodeId);
         }
         if (executors.size() == 0) {
             LOG.warn("Trying to assign nothing from " + td.getId() + " to " + _nodeId + " (Ignored)");
         }
-
         if (target == null) {
-            target = _freeSlots.iterator().next();
+            target = getFreeSlots().iterator().next();
         }
-        if (!_freeSlots.contains(target)) {
+        if (!freeSlots.contains(target)) {
             throw new IllegalStateException("Trying to assign already used slot" + target.getPort() + "on node " + _nodeId);
-        } else {
-            allocateResourceToSlot(td, executors, target);
-            _cluster.assign(target, td.getId(), executors);
-            assignInternal(target, td.getId(), false);
         }
+        target = allocateResourceToSlot(td, executors, target);
+        _cluster.assign(target, td.getId(), executors);
+
+        //assigning internally
+        if (!_topIdToUsedSlots.containsKey(td.getId())) {
+            _topIdToUsedSlots.put(td.getId(), new HashMap<String, Collection<ExecutorDetails>>());
+        }
+
+        if (!_topIdToUsedSlots.get(td.getId()).containsKey(target.getId())) {
+            _topIdToUsedSlots.get(td.getId()).put(target.getId(), new LinkedList<ExecutorDetails>());
+        }
+        _topIdToUsedSlots.get(td.getId()).get(target.getId()).addAll(executors);
     }
 
     /**
      * Assign a free slot on the node to the following topology and executors.
-     * This will update the cluster too.
      * @param td the TopologyDetails to assign a free slot to.
      * @param executors the executors to run in that slot.
      */
@@ -521,8 +541,4 @@ public class RAS_Node {
         consumeCPU(taskCpuReq);
         consumeMemory(taskMemReq);
     }
-
-    public Map<String, Set<WorkerSlot>> getTopoIdTousedSlots() {
-        return _topIdToUsedSlots;
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/829ea117/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java
index f6662e9..389a63c 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java
@@ -18,7 +18,6 @@
 
 package org.apache.storm.scheduler.resource;
 
-import org.apache.storm.Config;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.SchedulerAssignment;
@@ -30,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
 
 public class RAS_Nodes {
@@ -43,37 +43,62 @@ public class RAS_Nodes {
     }
 
     public static Map<String, RAS_Node> getAllNodesFrom(Cluster cluster, Topologies topologies) {
+
+        //A map of node ids to node objects
         Map<String, RAS_Node> nodeIdToNode = new HashMap<String, RAS_Node>();
-        for (SupervisorDetails sup : cluster.getSupervisors().values()) {
-            //Node ID and supervisor ID are the same.
-            String id = sup.getId();
-            boolean isAlive = !cluster.isBlackListed(id);
-            LOG.debug("Found a {} Node {} {}",
-                    isAlive ? "living" : "dead", id, sup.getAllPorts());
-            LOG.debug("resources_mem: {}, resources_CPU: {}", sup.getTotalMemory(), sup.getTotalCPU());
-            nodeIdToNode.put(sup.getId(), new RAS_Node(id, sup.getAllPorts(), isAlive, sup, cluster, topologies));
-        }
+        //A map of assignments organized by node with the following format:
+        //{nodeId -> {topologyId -> {workerId -> {execs}}}}
+        Map<String, Map<String, Map<String, Collection<ExecutorDetails>>>> assignmentRelationshipMap
+                = new HashMap<String, Map<String, Map<String, Collection<ExecutorDetails>>>>();
+
+        Map<String, Map<String, WorkerSlot>> workerIdToWorker = new HashMap<String, Map<String, WorkerSlot>>();
         for (SchedulerAssignment assignment : cluster.getAssignments().values()) {
             String topId = assignment.getTopologyId();
-            for (WorkerSlot workerSlot : assignment.getSlots()) {
-                String id = workerSlot.getNodeId();
-                RAS_Node node = nodeIdToNode.get(id);
-                if (node == null) {
-                    LOG.info("Found an assigned slot on a dead supervisor {} with executors {}",
-                            workerSlot, RAS_Node.getExecutors(workerSlot, cluster));
-                    node = new RAS_Node(id, null, false, null, cluster, topologies);
-                    nodeIdToNode.put(id, node);
+
+            for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry : assignment.getSlotToExecutors().entrySet()) {
+                WorkerSlot slot = entry.getKey();
+                String nodeId = slot.getNodeId();
+                Collection<ExecutorDetails> execs = entry.getValue();
+                if (!assignmentRelationshipMap.containsKey(nodeId)) {
+                    assignmentRelationshipMap.put(nodeId, new HashMap<String, Map<String, Collection<ExecutorDetails>>>());
+                    workerIdToWorker.put(nodeId, new HashMap<String, WorkerSlot>());
+                }
+                workerIdToWorker.get(nodeId).put(slot.getId(), slot);
+                if (!assignmentRelationshipMap.get(nodeId).containsKey(topId)) {
+                    assignmentRelationshipMap.get(nodeId).put(topId, new HashMap<String, Collection<ExecutorDetails>>());
                 }
-                if (!node.isAlive()) {
-                    //The supervisor on the node is down so add an orphaned slot to hold the unsupervised worker
-                    node.addOrphanedSlot(workerSlot);
+                if (!assignmentRelationshipMap.get(nodeId).get(topId).containsKey(slot.getId())) {
+                    assignmentRelationshipMap.get(nodeId).get(topId).put(slot.getId(), new LinkedList<ExecutorDetails>());
+                }
+                assignmentRelationshipMap.get(nodeId).get(topId).get(slot.getId()).addAll(execs);
+            }
+        }
+
+        for (SupervisorDetails sup : cluster.getSupervisors().values()) {
+            //Initialize a worker slot for every port even if there is no assignment to it
+            for (int port : sup.getAllPorts()) {
+                WorkerSlot worker = new WorkerSlot(sup.getId(), port);
+                if (!workerIdToWorker.containsKey(sup.getId())) {
+                    workerIdToWorker.put(sup.getId(), new HashMap<String, WorkerSlot>());
                 }
-                if (node.assignInternal(workerSlot, topId, true)) {
-                    LOG.warn("Bad scheduling state, {} assigned multiple workers, unassigning everything...", workerSlot);
-                    node.free(workerSlot);
+                if (!workerIdToWorker.get(sup.getId()).containsKey(worker.getId())) {
+                    workerIdToWorker.get(sup.getId()).put(worker.getId(), worker);
                 }
             }
+            nodeIdToNode.put(sup.getId(), new RAS_Node(sup.getId(), sup, cluster, topologies, workerIdToWorker.get(sup.getId()), assignmentRelationshipMap.get(sup.getId())));
         }
+
+        //Add in supervisors that might have crashed but workers are still alive
+        for(Map.Entry<String, Map<String, Map<String, Collection<ExecutorDetails>>>> entry : assignmentRelationshipMap.entrySet()) {
+            String nodeId = entry.getKey();
+            Map<String, Map<String, Collection<ExecutorDetails>>> assignments = entry.getValue();
+            if (!nodeIdToNode.containsKey(nodeId)) {
+                LOG.info("Found an assigned slot(s) on a dead supervisor {} with assignments {}",
+                        nodeId, assignments);
+                nodeIdToNode.put(nodeId, new RAS_Node(nodeId, null, cluster, topologies, workerIdToWorker.get(nodeId), assignments));
+            }
+        }
+
         updateAvailableResources(cluster, topologies, nodeIdToNode);
         return nodeIdToNode;
     }
@@ -113,13 +138,7 @@ public class RAS_Nodes {
                     if (topoMemoryResourceList.containsKey(exec)) {
                         node.consumeResourcesforTask(exec, topologies.getById(entry.getKey()));
                     } else {
-                        LOG.warn("Resource Req not found...Scheduling Task {} with memory requirement as on heap - {} " +
-                                        "and off heap - {} and CPU requirement as {}",
-                                exec,
-                                Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
-                                Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
-                        topologies.getById(entry.getKey()).addDefaultResforExec(exec);
-                        node.consumeResourcesforTask(exec, topologies.getById(entry.getKey()));
+                        throw new IllegalStateException("Executor " + exec + "not found!");
                     }
                 }
             }
@@ -130,10 +149,17 @@ public class RAS_Nodes {
         }
     }
 
+    /**
+     * get node object from nodeId
+     */
     public RAS_Node getNodeById(String nodeId) {
         return this.nodeMap.get(nodeId);
     }
 
+    /**
+     *
+     * @param workerSlots
+     */
     public void freeSlots(Collection<WorkerSlot> workerSlots) {
         for (RAS_Node node : nodeMap.values()) {
             for (WorkerSlot ws : node.getUsedSlots()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/829ea117/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 638af0d..a913e69 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -120,6 +120,8 @@ public class ResourceAwareScheduler implements IScheduler {
                 break;
             }
             scheduleTopology(td);
+
+            LOG.info("Nodes after scheduling:{}", this.nodes);
         }
     }
 
@@ -146,7 +148,7 @@ public class ResourceAwareScheduler implements IScheduler {
                 SchedulingResult result = null;
                 try {
                     //Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
-                    rasStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+                    rasStrategy.prepare(new ClusterStateData(this.cluster, this.topologies));
                     result = rasStrategy.schedule(td);
                 } catch (Exception ex) {
                     LOG.error(String.format("Exception thrown when running strategy %s to schedule topology %s. Topology will not be scheduled!"
@@ -254,6 +256,9 @@ public class ResourceAwareScheduler implements IScheduler {
                 Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
                 RAS_Node targetNode = this.nodes.getNodeById(targetSlot.getNodeId());
                 targetNode.assign(targetSlot, td, execsNeedScheduling);
+                for (ExecutorDetails exec : execsNeedScheduling) {
+                    targetNode.consumeResourcesforTask(exec, td);
+                }
                 LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: {} To Node: {} on Slot: {}",
                         td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort());
                 if (!nodesUsed.contains(targetNode.getId())) {

http://git-wip-us.apache.org/repos/asf/storm/blob/829ea117/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 2ceed21..86d12b0 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -30,44 +30,39 @@ import java.util.TreeMap;
 import java.util.HashSet;
 import java.util.Iterator;
 
-import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.ClusterStateData.NodeDetails;
+import org.apache.storm.scheduler.resource.ClusterStateData;
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import org.apache.storm.scheduler.resource.SchedulingStatus;
-import org.apache.storm.scheduler.resource.User;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
-import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 import org.apache.storm.scheduler.resource.Component;
-import org.apache.storm.scheduler.resource.RAS_Node;
 
 public class DefaultResourceAwareStrategy implements IStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
-    private Topologies _topologies;
-    private Cluster _cluster;
-    //Map key is the supervisor id and the value is the corresponding RAS_Node Object 
-    private Map<String, RAS_Node> _availNodes;
-    private RAS_Node refNode = null;
+    private ClusterStateData _clusterStateData;
+    //Map key is the supervisor id and the value is the corresponding RAS_Node Object
+    private Map<String, NodeDetails> _availNodes;
+    private NodeDetails refNode = null;
     /**
      * supervisor id -> Node
      */
-    private Map<String, RAS_Node> _nodes;
+    private Map<String, NodeDetails> _nodes;
     private Map<String, List<String>> _clusterInfo;
 
     private final double CPU_WEIGHT = 1.0;
     private final double MEM_WEIGHT = 1.0;
     private final double NETWORK_WEIGHT = 1.0;
 
-    public void prepare (Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
-        _topologies = topologies;
-        _cluster = cluster;
-        _nodes = RAS_Nodes.getAllNodesFrom(cluster, _topologies);
+    public void prepare (ClusterStateData clusterStateData) {
+        _clusterStateData = clusterStateData;
+        _nodes = clusterStateData.nodes;
         _availNodes = this.getAvailNodes();
-        _clusterInfo = cluster.getNetworkTopography();
+        _clusterInfo = _clusterStateData.getNetworkTopography();
         LOG.debug(this.getClusterInfo());
     }
 
@@ -93,18 +88,18 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             LOG.warn("No available nodes to schedule tasks on!");
             return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
         }
-        Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
+        Collection<ExecutorDetails> unassignedExecutors = _clusterStateData.getUnassignedExecutors(td.getId());
         Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>();
         LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
         Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
-        List<Component> spouts = this.getSpouts(_topologies, td);
+        List<Component> spouts = this.getSpouts(td);
 
         if (spouts.size() == 0) {
             LOG.error("Cannot find a Spout!");
             return SchedulingResult.failure(SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
         }
 
-        Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
+        Queue<Component> ordered__Component_list = bfs(td, spouts);
 
         Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
         Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
@@ -121,7 +116,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
                     td.getTaskResourceReqList(exec), entry.getKey() });
                     WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
                     if (targetSlot != null) {
-                        RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
+                        NodeDetails targetNode = this.idToNode(targetSlot.getNodeId());
                         if(!schedulerAssignmentMap.containsKey(targetSlot)) {
                             schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
                         }
@@ -147,7 +142,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         for (ExecutorDetails exec : executorsNotScheduled) {
             WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
             if (targetSlot != null) {
-                RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
+                NodeDetails targetNode = this.idToNode(targetSlot.getNodeId());
                 if(!schedulerAssignmentMap.containsKey(targetSlot)) {
                     schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
                 }
@@ -205,7 +200,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
     private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
         double taskMem = td.getTotalMemReqTask(exec);
         double taskCPU = td.getTotalCpuReqTask(exec);
-        List<RAS_Node> nodes;
+        List<NodeDetails> nodes;
         if(clusterId != null) {
             nodes = this.getAvailableNodesFromCluster(clusterId);
             
@@ -213,8 +208,8 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             nodes = this.getAvailableNodes();
         }
         //First sort nodes by distance
-        TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<>();
-        for (RAS_Node n : nodes) {
+        TreeMap<Double, NodeDetails> nodeRankMap = new TreeMap<>();
+        for (NodeDetails n : nodes) {
             if(n.getFreeSlots().size()>0) {
                 if (n.getAvailableMemoryResources() >= taskMem
                         && n.getAvailableCpuResources() >= taskCPU) {
@@ -233,8 +228,8 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             }
         }
         //Then, pick worker from closest node that satisfy constraints
-        for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) {
-            RAS_Node n = entry.getValue();
+        for(Map.Entry<Double, NodeDetails> entry : nodeRankMap.entrySet()) {
+            NodeDetails n = entry.getValue();
             for(WorkerSlot ws : n.getFreeSlots()) {
                 if(checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) {
                     return ws;
@@ -269,7 +264,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         return res;
     }
 
-    private Double distToNode(RAS_Node src, RAS_Node dest) {
+    private Double distToNode(NodeDetails src, NodeDetails dest) {
         if (src.getId().equals(dest.getId())) {
             return 0.0;
         } else if (this.NodeToCluster(src).equals(this.NodeToCluster(dest))) {
@@ -279,7 +274,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         }
     }
 
-    private String NodeToCluster(RAS_Node node) {
+    private String NodeToCluster(NodeDetails node) {
         for (Entry<String, List<String>> entry : _clusterInfo
                 .entrySet()) {
             if (entry.getValue().contains(node.getHostname())) {
@@ -290,16 +285,16 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         return null;
     }
     
-    private List<RAS_Node> getAvailableNodes() {
-        LinkedList<RAS_Node> nodes = new LinkedList<>();
+    private List<NodeDetails> getAvailableNodes() {
+        LinkedList<NodeDetails> nodes = new LinkedList<>();
         for (String clusterId : _clusterInfo.keySet()) {
             nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
         }
         return nodes;
     }
 
-    private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
-        List<RAS_Node> retList = new ArrayList<>();
+    private List<NodeDetails> getAvailableNodesFromCluster(String clus) {
+        List<NodeDetails> retList = new ArrayList<>();
         for (String node_id : _clusterInfo.get(clus)) {
             retList.add(_availNodes.get(this
                     .NodeHostnameToId(node_id)));
@@ -308,9 +303,9 @@ public class DefaultResourceAwareStrategy implements IStrategy {
     }
 
     private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
-        List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
+        List<NodeDetails> nodes = this.getAvailableNodesFromCluster(clusterId);
         List<WorkerSlot> workers = new LinkedList<>();
-        for(RAS_Node node : nodes) {
+        for(NodeDetails node : nodes) {
             workers.addAll(node.getFreeSlots());
         }
         return workers;
@@ -327,18 +322,17 @@ public class DefaultResourceAwareStrategy implements IStrategy {
     /**
      * In case in the future RAS can only use a subset of nodes
      */
-    private Map<String, RAS_Node> getAvailNodes() {
+    private Map<String, NodeDetails> getAvailNodes() {
         return _nodes;
     }
 
     /**
      * Breadth first traversal of the topology DAG
-     * @param topologies
      * @param td
      * @param spouts
      * @return A partial ordering of components
      */
-    private Queue<Component> bfs(Topologies topologies, TopologyDetails td, List<Component> spouts) {
+    private Queue<Component> bfs(TopologyDetails td, List<Component> spouts) {
         // Since queue is a interface
         Queue<Component> ordered__Component_list = new LinkedList<Component>();
         HashMap<String, Component> visited = new HashMap<>();
@@ -357,7 +351,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
                     neighbors.addAll(comp.parents);
                     for (String nbID : neighbors) {
                         if (!visited.containsKey(nbID)) {
-                            Component child = topologies.getAllComponents().get(td.getId()).get(nbID);
+                            Component child = td.getComponents().get(nbID);
                             queue.offer(child);
                         }
                     }
@@ -367,10 +361,10 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         return ordered__Component_list;
     }
 
-    private List<Component> getSpouts(Topologies topologies, TopologyDetails td) {
+    private List<Component> getSpouts(TopologyDetails td) {
         List<Component> spouts = new ArrayList<>();
-        for (Component c : topologies.getAllComponents().get(td.getId())
-                .values()) {
+
+        for (Component c : td.getComponents().values()) {
             if (c.type == Component.ComponentType.SPOUT) {
                 spouts.add(c);
             }
@@ -446,7 +440,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
             String clusterId = clusterEntry.getKey();
             retVal += "Rack: " + clusterId + "\n";
             for(String nodeHostname : clusterEntry.getValue()) {
-                RAS_Node node = this.idToNode(this.NodeHostnameToId(nodeHostname));
+                NodeDetails node = this.idToNode(this.NodeHostnameToId(nodeHostname));
                 retVal += "-> Node: " + node.getHostname() + " " + node.getId() + "\n";
                 retVal += "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + "}\n";
                 retVal += "--> Total Resources: {Mem " + node.getTotalMemoryResources() + ", CPU " + node.getTotalCpuResources() + "}\n";
@@ -461,7 +455,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
      * @return the id of a node
      */
     public String NodeHostnameToId(String hostname) {
-        for (RAS_Node n : _nodes.values()) {
+        for (NodeDetails n : _nodes.values()) {
             if (n.getHostname() == null) {
                 continue;
             }
@@ -478,7 +472,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
      * @param id
      * @return a RAS_Node object
      */
-    public RAS_Node idToNode(String id) {
+    public NodeDetails idToNode(String id) {
         if(_nodes.containsKey(id) == false) {
             LOG.error("Cannot find Node with Id: {}", id);
             return null;

http://git-wip-us.apache.org/repos/asf/storm/blob/829ea117/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
index 01cd808..58eb236 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -18,14 +18,9 @@
 
 package org.apache.storm.scheduler.resource.strategies.scheduling;
 
-import java.util.Map;
-
-import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.ClusterStateData;
 import org.apache.storm.scheduler.resource.SchedulingResult;
-import org.apache.storm.scheduler.resource.User;
 
 /**
  * An interface to for implementing different scheduling strategies for the resource aware scheduling
@@ -36,7 +31,7 @@ public interface IStrategy {
     /**
      * initialize prior to scheduling
      */
-    public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
+    public void prepare(ClusterStateData clusterStateData);
 
     /**
      * This method is invoked to calcuate a scheduling for topology td


[4/5] storm git commit: edits based on comments

Posted by je...@apache.org.
edits based on comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/05294108
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/05294108
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/05294108

Branch: refs/heads/master
Commit: 052941084bc7fcaee3d433d39d2bcc7a9953c69a
Parents: 4722c6f
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Jan 14 23:33:28 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Jan 14 23:33:28 2016 -0600

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  2 +-
 .../org/apache/storm/scheduler/WorkerSlot.java  |  2 +-
 .../storm/scheduler/resource/RAS_Node.java      |  5 +-
 .../DefaultResourceAwareStrategy.java           | 57 ++++++++------------
 .../strategies/scheduling/IStrategy.java        |  4 +-
 5 files changed, 29 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 0555ed9..58a2a22 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -701,7 +701,7 @@
                        ;; making a map from node+port to WorkerSlot with allocated resources
                        node+port->slot (into {} (for [[[node port] [mem-on-heap mem-off-heap cpu]] worker->resources]
                                                   {[node port]
-                                                   (doto (WorkerSlot. node port mem-on-heap mem-off-heap cpu))}))
+                                                   (WorkerSlot. node port mem-on-heap mem-off-heap cpu)}))
                        executor->slot (into {} (for [[executor [node port]] executor->node+port]
                                                  ;; filter out the dead executors
                                                  (if (contains? alive-executors executor)

http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java b/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
index 423764d..308af85 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
@@ -48,7 +48,7 @@ public class WorkerSlot {
     }
 
     public String getId() {
-        return this.getNodeId() + ":" + this.getPort();
+        return getNodeId() + ":" + getPort();
     }
 
     public double getAllocatedMemOnHeap() {

http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
index e33122b..8d37805 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -99,7 +99,7 @@ public class RAS_Node {
     }
 
     /**
-     * intializes resource usages on node
+     * initializes resource usages on node
      */
     private void intializeResources() {
         for (Entry<String, Map<String, Collection<ExecutorDetails>>> entry : _topIdToUsedSlots.entrySet()) {
@@ -204,7 +204,7 @@ public class RAS_Node {
     /**
      * Free all slots on this node.  This will update the Cluster too.
      */
-     public void freeAllSlots() {
+    public void freeAllSlots() {
         if (!_isAlive) {
             LOG.warn("Freeing all slots on a dead node {} ", _nodeId);
         }
@@ -269,7 +269,6 @@ public class RAS_Node {
             return 0.0;
         }
         Collection<ExecutorDetails> execs = getExecutors(ws, _cluster);
-        LOG.info("getMemoryUsedByWorker executors: {}", execs);
         double totalMemoryUsed = 0.0;
         for (ExecutorDetails exec : execs) {
             totalMemoryUsed += topo.getTotalMemReqTask(exec);

http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 86d12b0..9ecba47 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -114,23 +114,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
                     LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
                             new Object[] { exec, td.getExecutorToComponent().get(exec),
                     td.getTaskResourceReqList(exec), entry.getKey() });
-                    WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
-                    if (targetSlot != null) {
-                        NodeDetails targetNode = this.idToNode(targetSlot.getNodeId());
-                        if(!schedulerAssignmentMap.containsKey(targetSlot)) {
-                            schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
-                        }
-                       
-                        schedulerAssignmentMap.get(targetSlot).add(exec);
-                        targetNode.consumeResourcesforTask(exec, td);
-                        scheduledTasks.add(exec);
-                        LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
-                                targetNode, targetNode.getAvailableMemoryResources(),
-                                targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
-                                targetNode.getTotalCpuResources(), targetSlot);
-                    } else {
-                        LOG.error("Not Enough Resources to schedule Task {}", exec);
-                    }
+                    scheduleExecutor(exec, td, schedulerAssignmentMap, scheduledTasks);
                     it.remove();
                 }
             }
@@ -140,23 +124,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
         // schedule left over system tasks
         for (ExecutorDetails exec : executorsNotScheduled) {
-            WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
-            if (targetSlot != null) {
-                NodeDetails targetNode = this.idToNode(targetSlot.getNodeId());
-                if(!schedulerAssignmentMap.containsKey(targetSlot)) {
-                    schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
-                }
-               
-                schedulerAssignmentMap.get(targetSlot).add(exec);
-                targetNode.consumeResourcesforTask(exec, td);
-                scheduledTasks.add(exec);
-                LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
-                        targetNode, targetNode.getAvailableMemoryResources(),
-                        targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
-                        targetNode.getTotalCpuResources(), targetSlot);
-            } else {
-                LOG.error("Not Enough Resources to schedule Task {}", exec);
-            }
+            scheduleExecutor(exec, td, schedulerAssignmentMap, scheduledTasks);
         }
 
         SchedulingResult result;
@@ -177,6 +145,27 @@ public class DefaultResourceAwareStrategy implements IStrategy {
         return result;
     }
 
+    private void scheduleExecutor(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot,
+            Collection<ExecutorDetails>> schedulerAssignmentMap, Collection<ExecutorDetails> scheduledTasks) {
+        WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
+        if (targetSlot != null) {
+            NodeDetails targetNode = this.idToNode(targetSlot.getNodeId());
+            if (!schedulerAssignmentMap.containsKey(targetSlot)) {
+                schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
+            }
+
+            schedulerAssignmentMap.get(targetSlot).add(exec);
+            targetNode.consumeResourcesforTask(exec, td);
+            scheduledTasks.add(exec);
+            LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
+                    targetNode, targetNode.getAvailableMemoryResources(),
+                    targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
+                    targetNode.getTotalCpuResources(), targetSlot);
+        } else {
+            LOG.error("Not Enough Resources to schedule Task {}", exec);
+        }
+    }
+
     private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
       WorkerSlot ws;
       // first scheduling

http://git-wip-us.apache.org/repos/asf/storm/blob/05294108/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
index 58eb236..4a1180a 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -31,7 +31,7 @@ public interface IStrategy {
     /**
      * initialize prior to scheduling
      */
-    public void prepare(ClusterStateData clusterStateData);
+    void prepare(ClusterStateData clusterStateData);
 
     /**
      * This method is invoked to calcuate a scheduling for topology td
@@ -41,5 +41,5 @@ public interface IStrategy {
      * this map is the worker slot that the value (collection of executors) should be assigned to.
      * if a scheduling is calculated successfully, put the scheduling map in the SchedulingResult object.
      */
-    public SchedulingResult schedule(TopologyDetails td);
+    SchedulingResult schedule(TopologyDetails td);
 }


[5/5] storm git commit: Merge branch 'STORM-1450' of https://github.com/jerrypeng/storm [STORM-1450] - Fix minor bugs and refactor code in ResourceAwareScheduler

Posted by je...@apache.org.
Merge branch 'STORM-1450' of https://github.com/jerrypeng/storm
[STORM-1450] - Fix minor bugs and refactor code in ResourceAwareScheduler


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e151625
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e151625
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e151625

Branch: refs/heads/master
Commit: 6e15162561805d678c30fd7dc0d80f9545669b33
Parents: c9d687e 0529410
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Tue Jan 19 10:00:27 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Tue Jan 19 10:00:27 2016 -0600

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  11 +-
 .../jvm/org/apache/storm/scheduler/Cluster.java |  54 ++-
 .../storm/scheduler/SchedulerAssignment.java    |   3 +
 .../scheduler/SchedulerAssignmentImpl.java      |  14 +
 .../apache/storm/scheduler/TopologyDetails.java |   2 +-
 .../org/apache/storm/scheduler/WorkerSlot.java  |  24 +-
 .../scheduler/resource/ClusterStateData.java    | 101 ++++++
 .../storm/scheduler/resource/RAS_Node.java      | 342 +++++++++----------
 .../storm/scheduler/resource/RAS_Nodes.java     | 122 +++----
 .../resource/ResourceAwareScheduler.java        |  44 ++-
 .../DefaultResourceAwareStrategy.java           | 135 ++++----
 .../strategies/scheduling/IStrategy.java        |  11 +-
 .../scheduler/resource_aware_scheduler_test.clj |   8 +-
 .../resource/TestResourceAwareScheduler.java    | 161 +++++++++
 14 files changed, 658 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6e151625/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------


[3/5] storm git commit: deleting unneccessary code

Posted by je...@apache.org.
deleting unneccessary code


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4722c6f5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4722c6f5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4722c6f5

Branch: refs/heads/master
Commit: 4722c6f5d467c147996a027d00baf867cf6bc54d
Parents: 6883669
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Jan 14 14:53:02 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Jan 14 14:53:02 2016 -0600

----------------------------------------------------------------------
 .../resource/TestResourceAwareScheduler.java    | 79 --------------------
 1 file changed, 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4722c6f5/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index fae663f..c4c1b3b 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -945,76 +945,6 @@ public class TestResourceAwareScheduler {
      * If users are above his or her guarantee, check if topology eviction works correct
      */
     @Test
-    public void Test() {
-        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
-        Map<String, Number> resourceMap = new HashMap<String, Number>();
-        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
-        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
-        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
-        Config config = new Config();
-        config.putAll(Utils.readDefaultConfig());
-        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
-        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
-        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
-        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
-        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
-        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
-        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
-        resourceUserPool.put("jerry", new HashMap<String, Number>());
-        resourceUserPool.get("jerry").put("cpu", 70.0);
-        resourceUserPool.get("jerry").put("memory", 700.0);
-
-        resourceUserPool.put("bobby", new HashMap<String, Number>());
-        resourceUserPool.get("bobby").put("cpu", 100.0);
-        resourceUserPool.get("bobby").put("memory", 1000.0);
-
-        resourceUserPool.put("derek", new HashMap<String, Number>());
-        resourceUserPool.get("derek").put("cpu", 25.0);
-        resourceUserPool.get("derek").put("memory", 250.0);
-
-        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
-        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, currentTime - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 2, 0, currentTime - 2, 10);
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 2, 0, currentTime - 2, 10);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 2, 0, currentTime - 2, 29);
-
-        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
-        topoMap.put(topo1.getId(), topo1);
-        topoMap.put(topo3.getId(), topo3);
-        topoMap.put(topo4.getId(), topo4);
-        topoMap.put(topo5.getId(), topo5);
-
-        Topologies topologies = new Topologies(topoMap);
-
-        ResourceAwareScheduler rs = new ResourceAwareScheduler();
-
-        rs.prepare(config);
-        rs.schedule(topologies, cluster);
-
-        LOG.info("Assignments: {}", cluster.getAssignments());
-        for (Map.Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
-            LOG.info("Topology id: {}", entry.getKey());
-            for(WorkerSlot target: entry.getValue().getSlots()) {
-                LOG.info("target resources onheap: {} offheap: {} cpu: {}", target.getAllocatedMemOnHeap(), target.getAllocatedMemOffHeap(), target.getAllocatedCpu());
-            }
-
-        }
-    }
-
-    /**
-     * If users are above his or her guarantee, check if topology eviction works correct
-     */
-    @Test
     public void TestOverGuaranteeEviction() {
         INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
         Map<String, Number> resourceMap = new HashMap<String, Number>();
@@ -1159,15 +1089,6 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
         Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
-
-        LOG.info("Assignments: {}", cluster.getAssignments());
-        for (Map.Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
-            LOG.info("Topology id: {}", entry.getKey());
-            for(WorkerSlot target: entry.getValue().getSlots()) {
-                LOG.info("target resources onheap: {} offheap: {} cpu: {}", target.getAllocatedMemOnHeap(), target.getAllocatedMemOffHeap(), target.getAllocatedCpu());
-            }
-
-        }
     }
 
     /**


[2/5] storm git commit: adding an additional test and cleaning up

Posted by je...@apache.org.
adding an additional test and cleaning up


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/68836690
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/68836690
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/68836690

Branch: refs/heads/master
Commit: 688366902d98a7fdc17d854a33ff975c6a60ccab
Parents: 829ea11
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Wed Jan 13 14:13:54 2016 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Jan 14 14:25:07 2016 -0600

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |   5 +-
 .../jvm/org/apache/storm/scheduler/Cluster.java |  54 ++++-
 .../apache/storm/scheduler/TopologyDetails.java |   2 +-
 .../org/apache/storm/scheduler/WorkerSlot.java  |  10 +-
 .../scheduler/resource/ClusterStateData.java    |   4 +-
 .../storm/scheduler/resource/RAS_Node.java      | 141 +++++------
 .../storm/scheduler/resource/RAS_Nodes.java     |  48 ----
 .../resource/ResourceAwareScheduler.java        |  43 +++-
 .../scheduler/resource_aware_scheduler_test.clj |   8 +-
 .../resource/TestResourceAwareScheduler.java    | 240 +++++++++++++++++++
 10 files changed, 388 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 99260d9..0555ed9 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -847,12 +847,12 @@
         ;; call scheduler.schedule to schedule all the topologies
         ;; the new assignments for all the topologies are in the cluster object.
         _ (.schedule (:scheduler nimbus) topologies cluster)
-        _ (.setResourcesMap cluster @(:id->resources nimbus))
+        _ (.setTopologyResourcesMap cluster @(:id->resources nimbus))
         _ (if-not (conf SCHEDULER-DISPLAY-RESOURCE) (.updateAssignedMemoryForTopologyAndSupervisor cluster topologies))
         ;;merge with existing statuses
         _ (reset! (:id->sched-status nimbus) (merge (deref (:id->sched-status nimbus)) (.getStatusMap cluster)))
         _ (reset! (:node-id->resources nimbus) (.getSupervisorsResourcesMap cluster))
-        _ (reset! (:id->resources nimbus) (.getResourcesMap cluster))]
+        _ (reset! (:id->resources nimbus) (.getTopologyResourcesMap cluster))]
     (.getAssignments cluster)))
 
 (defn changed-executors [executor->node+port new-executor->node+port]
@@ -913,7 +913,6 @@
                                        existing-assignments
                                        topologies
                                        scratch-topology-id)
-
         topology->executor->node+port (compute-new-topology->executor->node+port new-scheduler-assignments existing-assignments)
 
         topology->executor->node+port (merge (into {} (for [id assigned-topology-ids] {id nil})) topology->executor->node+port)

http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java b/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
index 3650df2..4ac4eaf 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
@@ -57,7 +57,7 @@ public class Cluster {
     /**
      * key topologyId, Value: requested and assigned resources (e.g., on-heap/off-heap mem, cpu) for each topology.
      */
-    private Map<String, Double[]> resources;
+    private Map<String, Double[]> topologyResources;
 
     /**
      * a map from hostname to supervisor id.
@@ -76,7 +76,7 @@ public class Cluster {
         this.assignments = new HashMap<String, SchedulerAssignmentImpl>(assignments.size());
         this.assignments.putAll(assignments);
         this.status = new HashMap<String, String>();
-        this.resources = new HashMap<String, Double[]>();
+        this.topologyResources = new HashMap<String, Double[]>();
         this.supervisorsResources = new HashMap<String, Double[]>();
         this.hostToId = new HashMap<String, List<String>>();
         for (Map.Entry<String, SupervisorDetails> entry : supervisors.entrySet()) {
@@ -617,13 +617,13 @@ public class Cluster {
                     supervisorToAssignedMem.put(nodeId, assignedMemPerSlot);
                 }
             }
-            if (this.getResourcesMap().containsKey(topId)) {
-                Double[] topo_resources = getResourcesMap().get(topId);
+            if (this.getTopologyResourcesMap().containsKey(topId)) {
+                Double[] topo_resources = getTopologyResourcesMap().get(topId);
                 topo_resources[3] = assignedMemForTopology;
             } else {
                 Double[] topo_resources = {0.0, 0.0, 0.0, 0.0, 0.0, 0.0};
                 topo_resources[3] = assignedMemForTopology;
-                this.setResources(topId, topo_resources);
+                this.setTopologyResources(topId, topo_resources);
             }
         }
 
@@ -662,22 +662,50 @@ public class Cluster {
         this.status.putAll(statusMap);
     }
 
-    public void setResources(String topologyId, Double[] resources) {
-        this.resources.put(topologyId, resources);
+    /**
+     * Set the amount of resources used used by a topology. Used for displaying resource information on the UI
+     * @param topologyId
+     * @param resources describes the resources requested and assigned to topology in the following format in an array:
+     *  {requestedMemOnHeap, requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu}
+     */
+    public void setTopologyResources(String topologyId, Double[] resources) {
+        this.topologyResources.put(topologyId, resources);
     }
 
-    public void setResourcesMap(Map<String, Double[]> topologies_resources) {
-        this.resources.putAll(topologies_resources);
+    /**
+     * Set the amount of resources used used by a topology. Used for displaying resource information on the UI
+     * @param topologyResources a map that contains multiple topologies and the resources the topology requested and assigned.
+     * Key: topology id Value: an array that describes the resources the topology requested and assigned in the following format:
+     *  {requestedMemOnHeap, requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu}
+     */
+    public void setTopologyResourcesMap(Map<String, Double[]> topologyResources) {
+        this.topologyResources.putAll(topologyResources);
     }
 
-    public Map<String, Double[]> getResourcesMap() {
-        return this.resources;
+    /**
+     * Get the amount of resources used by topologies.  Used for displaying resource information on the UI
+     * @return  a map that contains multiple topologies and the resources the topology requested and assigned.
+     * Key: topology id Value: an array that describes the resources the topology requested and assigned in the following format:
+     *  {requestedMemOnHeap, requestedMemOffHeap, requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu}
+     */
+    public Map<String, Double[]> getTopologyResourcesMap() {
+        return this.topologyResources;
     }
 
-    public void setSupervisorsResourcesMap(Map<String, Double[]> supervisors_resources) {
-        this.supervisorsResources.putAll(supervisors_resources);
+    /**
+     * Sets the amount of used and free resources on a supervisor. Used for displaying resource information on the UI
+     * @param supervisorResources a map where the key is the supervisor id and the value is a map that represents
+     * resource usage for a supervisor in the following format: {totalMem, totalCpu, usedMem, usedCpu}
+     */
+    public void setSupervisorsResourcesMap(Map<String, Double[]> supervisorResources) {
+        this.supervisorsResources.putAll(supervisorResources);
     }
 
+    /**
+     * Get the amount of used and free resources on a supervisor.  Used for displaying resource information on the UI
+     * @return  a map where the key is the supervisor id and the value is a map that represents
+     * resource usage for a supervisor in the following format: {totalMem, totalCpu, usedMem, usedCpu}
+     */
     public Map<String, Double[]> getSupervisorsResourcesMap() {
         return this.supervisorsResources;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
index ea5bfad..a0eb4ad 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/TopologyDetails.java
@@ -420,7 +420,7 @@ public class TopologyDetails {
     /**
      * Add default resource requirements for a executor
      */
-    public void addDefaultResforExec(ExecutorDetails exec) {
+    private void addDefaultResforExec(ExecutorDetails exec) {
         Double topologyComponentCpuPcorePercent = Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null);
         Double topologyComponentResourcesOffheapMemoryMb = Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
         Double topologyComponentResourcesOnheapMemoryMb = Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);

http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java b/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
index 57cd9c5..423764d 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/WorkerSlot.java
@@ -18,14 +18,14 @@
 package org.apache.storm.scheduler;
 
 public class WorkerSlot {
-    protected String nodeId;
-    protected int port;
+    private String nodeId;
+    private int port;
     // amount of on-heap memory allocated to it
-    protected double memOnHeap = 0.0;
+    private double memOnHeap = 0.0;
     // amount of off-heap memory allocated to it
-    protected double memOffHeap = 0.0;
+    private double memOffHeap = 0.0;
     // amount of cpu allocated to it
-    protected double cpu = 0.0;
+    private double cpu = 0.0;
     
     public WorkerSlot(String nodeId, Number port) {
         this(nodeId, port, 0.0, 0.0, 0.0);

http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
index 0fffe93..ece2800 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
@@ -30,7 +30,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- *  A class to specify which data and API to expose to a scheduling strategy
+ * A class to specify which data and API to expose to a scheduling strategy
  */
 public class ClusterStateData {
 
@@ -86,7 +86,7 @@ public class ClusterStateData {
         this.cluster = cluster;
         this.topologies = topologies;
         Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
-        for(Map.Entry<String, RAS_Node> entry : nodes.entrySet()) {
+        for (Map.Entry<String, RAS_Node> entry : nodes.entrySet()) {
             this.nodes.put(entry.getKey(), new NodeDetails(entry.getValue()));
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
index 3e10085..e33122b 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -64,18 +64,10 @@ public class RAS_Node {
     public RAS_Node(String nodeId, SupervisorDetails sup, Cluster cluster, Topologies topologies, Map<String, WorkerSlot> workerIdToWorker, Map<String, Map<String, Collection<ExecutorDetails>>> assignmentMap) {
         //Node ID and supervisor ID are the same.
         _nodeId = nodeId;
-        _isAlive = !cluster.isBlackListed(_nodeId);
-
-        //check if node is alive
-        if (_isAlive && sup != null) {
-            _hostname = sup.getHost();
-            _sup = sup;
-            _availMemory = getTotalMemoryResources();
-            _availCPU = getTotalCpuResources();
-
-            LOG.debug("Found a {} Node {} {}",
-                    _isAlive ? "living" : "dead", _nodeId, sup.getAllPorts());
-            LOG.debug("resources_mem: {}, resources_CPU: {}", sup.getTotalMemory(), sup.getTotalCPU());
+        if (sup == null) {
+            _isAlive = false;
+        } else {
+            _isAlive = !cluster.isBlackListed(_nodeId);
         }
 
         _cluster = cluster;
@@ -90,6 +82,45 @@ public class RAS_Node {
         if (assignmentMap != null) {
             _topIdToUsedSlots = assignmentMap;
         }
+
+        //check if node is alive
+        if (_isAlive && sup != null) {
+            _hostname = sup.getHost();
+            _sup = sup;
+            _availMemory = getTotalMemoryResources();
+            _availCPU = getTotalCpuResources();
+
+            LOG.debug("Found a {} Node {} {}",
+                    _isAlive ? "living" : "dead", _nodeId, sup.getAllPorts());
+            LOG.debug("resources_mem: {}, resources_CPU: {}", sup.getTotalMemory(), sup.getTotalCPU());
+            //intialize resource usages on node
+            intializeResources();
+        }
+    }
+
+    /**
+     * intializes resource usages on node
+     */
+    private void intializeResources() {
+        for (Entry<String, Map<String, Collection<ExecutorDetails>>> entry : _topIdToUsedSlots.entrySet()) {
+            String topoId = entry.getKey();
+            Map<String, Collection<ExecutorDetails>> assignment = entry.getValue();
+            Map<ExecutorDetails, Double> topoMemoryResourceList = _topologies.getById(topoId).getTotalMemoryResourceList();
+            for (Collection<ExecutorDetails> execs : assignment.values()) {
+                for (ExecutorDetails exec : execs) {
+                    if (!_isAlive) {
+                        continue;
+                        // We do not free the assigned slots (the orphaned slots) on the inactive supervisors
+                        // The inactive node will be treated as a 0-resource node and not available for other unassigned workers
+                    }
+                    if (topoMemoryResourceList.containsKey(exec)) {
+                        consumeResourcesforTask(exec, _topologies.getById(topoId));
+                    } else {
+                        throw new IllegalStateException("Executor " + exec + "not found!");
+                    }
+                }
+            }
+        }
     }
 
     public String getId() {
@@ -109,7 +140,7 @@ public class RAS_Node {
     }
 
     public Collection<String> getFreeSlotsId() {
-        if(!_isAlive) {
+        if (!_isAlive) {
             return new HashSet<String>();
         }
         Collection<String> usedSlotsId = getUsedSlotsId();
@@ -200,22 +231,21 @@ public class RAS_Node {
             throw new IllegalArgumentException("Tried to free a slot " + ws + " that was already free!");
         }
 
-        //free slot
-        _cluster.freeSlot(ws);
-
-        //cleanup internal assignments
-        _topIdToUsedSlots.get(topo.getId()).remove(ws.getId());
-
         double memUsed = getMemoryUsedByWorker(ws);
         double cpuUsed = getCpuUsedByWorker(ws);
         freeMemory(memUsed);
         freeCPU(cpuUsed);
+
+        //free slot
+        _cluster.freeSlot(ws);
+        //cleanup internal assignments
+        _topIdToUsedSlots.get(topo.getId()).remove(ws.getId());
     }
 
     private void freeMemory(double amount) {
         LOG.debug("freeing {} memory on node {}...avail mem: {}", amount, getHostname(), _availMemory);
-        if((_availMemory + amount) > getTotalCpuResources()) {
-            LOG.warn("Freeing more memory than there exists!");
+        if((_availMemory + amount) > getTotalMemoryResources()) {
+            LOG.warn("Freeing more memory than there exists! Memory trying to free: {} Total memory on Node: {}", (_availMemory + amount), getTotalMemoryResources());
             return;
         }
         _availMemory += amount;
@@ -223,8 +253,8 @@ public class RAS_Node {
 
     private void freeCPU(double amount) {
         LOG.debug("freeing {} CPU on node...avail CPU: {}", amount, getHostname(), _availCPU);
-        if ((_availCPU + amount) > getAvailableCpuResources()) {
-            LOG.warn("Freeing more CPU than there exists!");
+        if ((_availCPU + amount) > getTotalCpuResources()) {
+            LOG.warn("Freeing more CPU than there exists! CPU trying to free: {} Total CPU on Node: {}", (_availMemory + amount), getTotalCpuResources());
             return;
         }
         _availCPU += amount;
@@ -239,6 +269,7 @@ public class RAS_Node {
             return 0.0;
         }
         Collection<ExecutorDetails> execs = getExecutors(ws, _cluster);
+        LOG.info("getMemoryUsedByWorker executors: {}", execs);
         double totalMemoryUsed = 0.0;
         for (ExecutorDetails exec : execs) {
             totalMemoryUsed += topo.getTotalMemReqTask(exec);
@@ -280,54 +311,6 @@ public class RAS_Node {
     }
 
     /**
-     * Allocate Mem and CPU resources to the assigned slot for the topology's executors.
-     * @param td the TopologyDetails that the slot is assigned to.
-     * @param executors the executors to run in that slot.
-     * @param slot the slot to allocate resource to
-     */
-//    private void allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
-//        double onHeapMem = 0.0;
-//        double offHeapMem = 0.0;
-//        double cpu = 0.0;
-//        for (ExecutorDetails exec : executors) {
-//            Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
-//            if (onHeapMemForExec != null) {
-//                onHeapMem += onHeapMemForExec;
-//            }
-//            Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
-//            if (offHeapMemForExec != null) {
-//                offHeapMem += offHeapMemForExec;
-//            }
-//            Double cpuForExec = td.getTotalCpuReqTask(exec);
-//            if (cpuForExec != null) {
-//                cpu += cpuForExec;
-//            }
-//        }
-//        slot.allocateResource(onHeapMem, offHeapMem, cpu);
-//    }
-
-    private WorkerSlot allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
-        double onHeapMem = 0.0;
-        double offHeapMem = 0.0;
-        double cpu = 0.0;
-        for (ExecutorDetails exec : executors) {
-            Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
-            if (onHeapMemForExec != null) {
-                onHeapMem += onHeapMemForExec;
-            }
-            Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
-            if (offHeapMemForExec != null) {
-                offHeapMem += offHeapMemForExec;
-            }
-            Double cpuForExec = td.getTotalCpuReqTask(exec);
-            if (cpuForExec != null) {
-                cpu += cpuForExec;
-            }
-        }
-        return new WorkerSlot(slot.getNodeId(), slot.getPort(), onHeapMem, offHeapMem, cpu);
-    }
-
-    /**
      * Assigns a worker to a node
      * @param target the worker slot to assign the executors
      * @param td the topology the executors are from
@@ -350,7 +333,8 @@ public class RAS_Node {
         if (!freeSlots.contains(target)) {
             throw new IllegalStateException("Trying to assign already used slot" + target.getPort() + "on node " + _nodeId);
         }
-        target = allocateResourceToSlot(td, executors, target);
+        LOG.info("target slot: {}", target);
+
         _cluster.assign(target, td.getId(), executors);
 
         //assigning internally
@@ -364,15 +348,6 @@ public class RAS_Node {
         _topIdToUsedSlots.get(td.getId()).get(target.getId()).addAll(executors);
     }
 
-    /**
-     * Assign a free slot on the node to the following topology and executors.
-     * @param td the TopologyDetails to assign a free slot to.
-     * @param executors the executors to run in that slot.
-     */
-    public void assign(TopologyDetails td, Collection<ExecutorDetails> executors) {
-        assign(null, td, executors);
-    }
-
     @Override
     public boolean equals(Object other) {
         if (other instanceof RAS_Node) {
@@ -487,7 +462,7 @@ public class RAS_Node {
     public Double consumeMemory(Double amount) {
         if (amount > _availMemory) {
             LOG.error("Attempting to consume more memory than available! Needed: {}, we only have: {}", amount, _availMemory);
-            return null;
+            throw new IllegalStateException("Attempting to consume more memory than available");
         }
         _availMemory = _availMemory - amount;
         return _availMemory;
@@ -524,7 +499,7 @@ public class RAS_Node {
     public Double consumeCPU(Double amount) {
         if (amount > _availCPU) {
             LOG.error("Attempting to consume more CPU than available! Needed: {}, we only have: {}", amount, _availCPU);
-            return null;
+            throw new IllegalStateException("Attempting to consume more memory than available");
         }
         _availCPU = _availCPU - amount;
         return _availCPU;

http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java
index 389a63c..00795a3 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/RAS_Nodes.java
@@ -98,58 +98,10 @@ public class RAS_Nodes {
                 nodeIdToNode.put(nodeId, new RAS_Node(nodeId, null, cluster, topologies, workerIdToWorker.get(nodeId), assignments));
             }
         }
-
-        updateAvailableResources(cluster, topologies, nodeIdToNode);
         return nodeIdToNode;
     }
 
     /**
-     * updates the available resources for every node in a cluster
-     * by recalculating memory requirements.
-     *
-     * @param cluster      the cluster used in this calculation
-     * @param topologies   container of all topologies
-     * @param nodeIdToNode a map between node id and node
-     */
-    private static void updateAvailableResources(Cluster cluster,
-                                                 Topologies topologies,
-                                                 Map<String, RAS_Node> nodeIdToNode) {
-        //recompute memory
-        if (cluster.getAssignments().size() > 0) {
-            for (Map.Entry<String, SchedulerAssignment> entry : cluster.getAssignments()
-                    .entrySet()) {
-                Map<ExecutorDetails, WorkerSlot> executorToSlot = entry.getValue()
-                        .getExecutorToSlot();
-                Map<ExecutorDetails, Double> topoMemoryResourceList = topologies.getById(entry.getKey()).getTotalMemoryResourceList();
-
-                if (topoMemoryResourceList == null || topoMemoryResourceList.size() == 0) {
-                    continue;
-                }
-                for (Map.Entry<ExecutorDetails, WorkerSlot> execToSlot : executorToSlot
-                        .entrySet()) {
-                    WorkerSlot slot = execToSlot.getValue();
-                    ExecutorDetails exec = execToSlot.getKey();
-                    RAS_Node node = nodeIdToNode.get(slot.getNodeId());
-                    if (!node.isAlive()) {
-                        continue;
-                        // We do not free the assigned slots (the orphaned slots) on the inactive supervisors
-                        // The inactive node will be treated as a 0-resource node and not available for other unassigned workers
-                    }
-                    if (topoMemoryResourceList.containsKey(exec)) {
-                        node.consumeResourcesforTask(exec, topologies.getById(entry.getKey()));
-                    } else {
-                        throw new IllegalStateException("Executor " + exec + "not found!");
-                    }
-                }
-            }
-        } else {
-            for (RAS_Node n : nodeIdToNode.values()) {
-                n.setAvailableMemory(n.getAvailableMemoryResources());
-            }
-        }
-    }
-
-    /**
      * get node object from nodeId
      */
     public RAS_Node getNodeById(String nodeId) {

http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index a913e69..2b35d6b 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -33,7 +33,6 @@ import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -121,8 +120,10 @@ public class ResourceAwareScheduler implements IScheduler {
             }
             scheduleTopology(td);
 
-            LOG.info("Nodes after scheduling:{}", this.nodes);
+            LOG.debug("Nodes after scheduling:\n{}", this.nodes);
         }
+        //updating resources used by supervisor
+        updateSupervisorsResources(this.cluster, this.topologies);
     }
 
     public void scheduleTopology(TopologyDetails td) {
@@ -255,12 +256,17 @@ public class ResourceAwareScheduler implements IScheduler {
                 WorkerSlot targetSlot = workerToTasksEntry.getKey();
                 Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
                 RAS_Node targetNode = this.nodes.getNodeById(targetSlot.getNodeId());
+
+                targetSlot = allocateResourceToSlot(td, execsNeedScheduling, targetSlot);
+
                 targetNode.assign(targetSlot, td, execsNeedScheduling);
+
+                LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: {} To Node: {} on Slot: {}",
+                        td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort());
+
                 for (ExecutorDetails exec : execsNeedScheduling) {
                     targetNode.consumeResourcesforTask(exec, td);
                 }
-                LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: {} To Node: {} on Slot: {}",
-                        td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort());
                 if (!nodesUsed.contains(targetNode.getId())) {
                     nodesUsed.add(targetNode.getId());
                 }
@@ -271,12 +277,12 @@ public class ResourceAwareScheduler implements IScheduler {
 
             Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
                     assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
-            LOG.debug("setResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " +
+            LOG.debug("setTopologyResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " +
                             "assigned on-heap mem, off-heap mem, cpu: {} {} {}",
                     td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
                     assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
-            this.cluster.setResources(td.getId(), resources);
-            updateSupervisorsResources(this.cluster, this.topologies);
+            //updating resources used for a topology
+            this.cluster.setTopologyResources(td.getId(), resources);
             return true;
         } else {
             LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", td.getName());
@@ -284,6 +290,27 @@ public class ResourceAwareScheduler implements IScheduler {
         }
     }
 
+    private WorkerSlot allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
+        double onHeapMem = 0.0;
+        double offHeapMem = 0.0;
+        double cpu = 0.0;
+        for (ExecutorDetails exec : executors) {
+            Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
+            if (onHeapMemForExec != null) {
+                onHeapMem += onHeapMemForExec;
+            }
+            Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
+            if (offHeapMemForExec != null) {
+                offHeapMem += offHeapMemForExec;
+            }
+            Double cpuForExec = td.getTotalCpuReqTask(exec);
+            if (cpuForExec != null) {
+                cpu += cpuForExec;
+            }
+        }
+        return new WorkerSlot(slot.getNodeId(), slot.getPort(), onHeapMem, offHeapMem, cpu);
+    }
+
     private void updateSupervisorsResources(Cluster cluster, Topologies topologies) {
         Map<String, Double[]> supervisors_resources = new HashMap<String, Double[]>();
         Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
@@ -401,7 +428,7 @@ public class ResourceAwareScheduler implements IScheduler {
         this.cluster.setAssignments(schedulingState.cluster.getAssignments());
         this.cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap());
         this.cluster.setStatusMap(schedulingState.cluster.getStatusMap());
-        this.cluster.setResourcesMap(schedulingState.cluster.getResourcesMap());
+        this.cluster.setTopologyResourcesMap(schedulingState.cluster.getTopologyResourcesMap());
         //don't need to explicitly set data structues like Cluster since nothing can really be changed
         //unless this.topologies is set to another object
         this.topologies = schedulingState.topologies;

http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
index 874363d..ec51914 100644
--- a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
+++ b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
@@ -112,25 +112,25 @@
       (is (= 4 (.totalSlotsFree node)))
       (is (= 0 (.totalSlotsUsed node)))
       (is (= 4 (.totalSlots node)))
-      (.assign node topology1 (list (ExecutorDetails. 1 1)))
+      (.assign node (.next (.iterator (.getFreeSlots node))) topology1 (list (ExecutorDetails. 1 1)))
       (is (= 1 (.size (.getRunningTopologies node))))
       (is (= false (.isTotallyFree node)))
       (is (= 3 (.totalSlotsFree node)))
       (is (= 1 (.totalSlotsUsed node)))
       (is (= 4 (.totalSlots node)))
-      (.assign node topology1 (list (ExecutorDetails. 2 2)))
+      (.assign node (.next (.iterator (.getFreeSlots node))) topology1 (list (ExecutorDetails. 2 2)))
       (is (= 1 (.size (.getRunningTopologies node))))
       (is (= false (.isTotallyFree node)))
       (is (= 2 (.totalSlotsFree node)))
       (is (= 2 (.totalSlotsUsed node)))
       (is (= 4 (.totalSlots node)))
-      (.assign node topology2 (list (ExecutorDetails. 1 1)))
+      (.assign node (.next (.iterator (.getFreeSlots node))) topology2 (list (ExecutorDetails. 1 1)))
       (is (= 2 (.size (.getRunningTopologies node))))
       (is (= false (.isTotallyFree node)))
       (is (= 1 (.totalSlotsFree node)))
       (is (= 3 (.totalSlotsUsed node)))
       (is (= 4 (.totalSlots node)))
-      (.assign node topology2 (list (ExecutorDetails. 2 2)))
+      (.assign node (.next (.iterator (.getFreeSlots node))) topology2 (list (ExecutorDetails. 2 2)))
       (is (= 2 (.size (.getRunningTopologies node))))
       (is (= false (.isTotallyFree node)))
       (is (= 0 (.totalSlotsFree node)))

http://git-wip-us.apache.org/repos/asf/storm/blob/68836690/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 1e0f6ed..fae663f 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.storm.scheduler.resource;
 
 import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.INimbus;
@@ -28,8 +29,10 @@ import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.validation.ConfigValidation;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -37,9 +40,12 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+
 public class TestResourceAwareScheduler {
 
     private final String TOPOLOGY_SUBMITTER = "jerry";
@@ -939,6 +945,76 @@ public class TestResourceAwareScheduler {
      * If users are above his or her guarantee, check if topology eviction works correct
      */
     @Test
+    public void Test() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+        resourceUserPool.put("jerry", new HashMap<String, Number>());
+        resourceUserPool.get("jerry").put("cpu", 70.0);
+        resourceUserPool.get("jerry").put("memory", 700.0);
+
+        resourceUserPool.put("bobby", new HashMap<String, Number>());
+        resourceUserPool.get("bobby").put("cpu", 100.0);
+        resourceUserPool.get("bobby").put("memory", 1000.0);
+
+        resourceUserPool.put("derek", new HashMap<String, Number>());
+        resourceUserPool.get("derek").put("cpu", 25.0);
+        resourceUserPool.get("derek").put("memory", 250.0);
+
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, currentTime - 2, 20);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 2, 0, currentTime - 2, 10);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 2, 0, currentTime - 2, 10);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 2, 0, currentTime - 2, 29);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo3.getId(), topo3);
+        topoMap.put(topo4.getId(), topo4);
+        topoMap.put(topo5.getId(), topo5);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        LOG.info("Assignments: {}", cluster.getAssignments());
+        for (Map.Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
+            LOG.info("Topology id: {}", entry.getKey());
+            for(WorkerSlot target: entry.getValue().getSlots()) {
+                LOG.info("target resources onheap: {} offheap: {} cpu: {}", target.getAllocatedMemOnHeap(), target.getAllocatedMemOffHeap(), target.getAllocatedCpu());
+            }
+
+        }
+    }
+
+    /**
+     * If users are above his or her guarantee, check if topology eviction works correct
+     */
+    @Test
     public void TestOverGuaranteeEviction() {
         INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
         Map<String, Number> resourceMap = new HashMap<String, Number>();
@@ -1083,6 +1159,15 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
         Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+
+        LOG.info("Assignments: {}", cluster.getAssignments());
+        for (Map.Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
+            LOG.info("Topology id: {}", entry.getKey());
+            for(WorkerSlot target: entry.getValue().getSlots()) {
+                LOG.info("target resources onheap: {} offheap: {} cpu: {}", target.getAllocatedMemOnHeap(), target.getAllocatedMemOffHeap(), target.getAllocatedCpu());
+            }
+
+        }
     }
 
     /**
@@ -1224,4 +1309,159 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
         Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
     }
+
+    /**
+     * test if the scheduling logic for the DefaultResourceAwareStrategy is correct
+     */
+    @Test
+    public void testDefaultResourceAwareStrategy() {
+        int spoutParallelism = 1;
+        int boltParallelism = 2;
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("spout", new TestUtilsForResourceAwareScheduler.TestSpout(),
+                spoutParallelism);
+        builder.setBolt("bolt-1", new TestUtilsForResourceAwareScheduler.TestBolt(),
+                boltParallelism).shuffleGrouping("spout");
+        builder.setBolt("bolt-2", new TestUtilsForResourceAwareScheduler.TestBolt(),
+                boltParallelism).shuffleGrouping("bolt-1");
+        builder.setBolt("bolt-3", new TestUtilsForResourceAwareScheduler.TestBolt(),
+                boltParallelism).shuffleGrouping("bolt-2");
+
+        StormTopology stormToplogy = builder.createTopology();
+
+        Config conf = new Config();
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 150.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1500.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+        conf.putAll(Utils.readDefaultConfig());
+        conf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        conf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        conf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        conf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 50.0);
+        conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 250);
+        conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 250);
+        conf.put(Config.TOPOLOGY_PRIORITY, 0);
+        conf.put(Config.TOPOLOGY_NAME, "testTopology");
+        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+
+        TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
+                TestUtilsForResourceAwareScheduler.genExecsAndComps(stormToplogy, spoutParallelism, boltParallelism)
+                , this.currentTime);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo.getId(), topo);
+        Topologies topologies = new Topologies(topoMap);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), conf);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(conf);
+        rs.schedule(topologies, cluster);
+
+        Map<String, List<String>> nodeToComps = new HashMap<String, List<String>>();
+        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : cluster.getAssignments().get("testTopology-id").getExecutorToSlot().entrySet()) {
+            WorkerSlot ws = entry.getValue();
+            ExecutorDetails exec = entry.getKey();
+            if (!nodeToComps.containsKey(ws.getNodeId())) {
+                nodeToComps.put(ws.getNodeId(), new LinkedList<String>());
+            }
+            nodeToComps.get(ws.getNodeId()).add(topo.getExecutorToComponent().get(exec));
+        }
+
+        /**
+         * check for correct scheduling
+         * Since all the resource availabilites on nodes are the same in the beginining
+         * DefaultResourceAwareStrategy can arbitrarily pick one thus we must find if a particular scheduling
+         * exists on a node the the cluster.
+         */
+
+        //one node should have the below scheduling
+        List<String> node1 = new LinkedList<>();
+        node1.add("spout");
+        node1.add("bolt-1");
+        node1.add("bolt-2");
+        Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node1));
+
+        //one node should have the below scheduling
+        List<String> node2 = new LinkedList<>();
+        node2.add("bolt-3");
+        node2.add("bolt-1");
+        node2.add("bolt-2");
+
+        Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node2));
+
+        //one node should have the below scheduling
+        List<String> node3 = new LinkedList<>();
+        node3.add("bolt-3");
+
+        Assert.assertTrue("Check DefaultResourceAwareStrategy scheduling", checkDefaultStrategyScheduling(nodeToComps, node3));
+
+        //three used and one node should be empty
+        Assert.assertEquals("only three nodes should be used", 3, nodeToComps.size());
+    }
+
+    private boolean checkDefaultStrategyScheduling(Map<String, List<String>> nodeToComps, List<String> schedulingToFind) {
+        for (List<String> entry : nodeToComps.values()) {
+            if (schedulingToFind.containsAll(entry) && entry.containsAll(schedulingToFind)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * test if free slots on nodes work correctly
+     */
+    @Test
+    public void TestNodeFreeSlot() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 2, 0, currentTime - 2, 29);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 2, 0, currentTime - 2, 10);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo2.getId(), topo2);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
+
+        for (SchedulerAssignment entry : cluster.getAssignments().values()) {
+            for (WorkerSlot ws : entry.getSlots()) {
+                double memoryBefore = nodes.get(ws.getNodeId()).getAvailableMemoryResources();
+                double cpuBefore = nodes.get(ws.getNodeId()).getAvailableCpuResources();
+                double memoryUsedByWorker = nodes.get(ws.getNodeId()).getMemoryUsedByWorker(ws);
+                Assert.assertEquals("Check if memory used by worker is calculated correctly", 1000.0, memoryUsedByWorker, 0.001);
+                double cpuUsedByWorker = nodes.get(ws.getNodeId()).getCpuUsedByWorker(ws);
+                Assert.assertEquals("Check if CPU used by worker is calculated correctly", 100.0, cpuUsedByWorker, 0.001);
+                nodes.get(ws.getNodeId()).free(ws);
+                double memoryAfter = nodes.get(ws.getNodeId()).getAvailableMemoryResources();
+                double cpuAfter = nodes.get(ws.getNodeId()).getAvailableCpuResources();
+                Assert.assertEquals("Check if free correctly frees amount of memory", memoryBefore + memoryUsedByWorker,  memoryAfter, 0.001);
+                Assert.assertEquals("Check if free correctly frees amount of memory", cpuBefore + cpuUsedByWorker,  cpuAfter, 0.001);
+                Assert.assertFalse("Check if worker was removed from assignments", entry.getSlotToExecutors().containsKey(ws));
+            }
+        }
+    }
 }