You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:34 UTC

[06/23] storm git commit: adding checkpointing

adding checkpointing


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9d3c864c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9d3c864c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9d3c864c

Branch: refs/heads/master
Commit: 9d3c864cdc708d62a088a1b9b23904bd6ea1b276
Parents: 3f55fee
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Wed Nov 18 11:09:45 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Dec 4 13:07:00 2015 -0600

----------------------------------------------------------------------
 .../jvm/backtype/storm/scheduler/Cluster.java   |   8 +
 .../backtype/storm/scheduler/Topologies.java    |   4 +
 .../storm/scheduler/resource/RAS_Node.java      |  77 +++-
 .../storm/scheduler/resource/RAS_Nodes.java     |  29 +-
 .../resource/ResourceAwareScheduler.java        | 236 ++++++-----
 .../storm/scheduler/resource/ResourceUtils.java |   9 +
 .../backtype/storm/scheduler/resource/User.java |  52 ++-
 .../storm/scheduler/resource/Experiment.java    | 257 +++++++-----
 .../resource/TestResourceAwareScheduler.java    | 400 ++++++++++++++++++-
 .../TestUtilsForResourceAwareScheduler.java     |  28 ++
 10 files changed, 862 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index 7f16b86..2676af1 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -90,6 +90,14 @@ public class Cluster {
         }
         this.conf = storm_conf;
     }
+
+    public Cluster getCopy() {
+        Cluster copy = new Cluster(this.inimbus, this.supervisors, this.assignments, this.conf);
+        for(Map.Entry<String, String> entry : this.status.entrySet()) {
+            copy.setStatus(entry.getKey(), entry.getValue());
+        }
+        return copy;
+    }
     
     public void setBlacklistedHosts(Set<String> hosts) {
         blackListedHosts = hosts;

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
index 0b4c0ca..59a53c8 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
@@ -68,6 +68,10 @@ public class Topologies {
         return _allComponents;
     }
 
+    public Topologies getCopy() {
+        return new Topologies(this.topologies);
+    }
+
     @Override
     public String toString() {
         String ret = "Topologies:\n";

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
index b0bcc8a..21367f0 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
@@ -55,9 +55,10 @@ public class RAS_Node {
     private Double _availCPU;
     private List<WorkerSlot> _slots;
     private Cluster _cluster;
+    private Topologies _topologies;
 
     public RAS_Node(String nodeId, Set<Integer> allPorts, boolean isAlive,
-                    SupervisorDetails sup, Cluster cluster) {
+                    SupervisorDetails sup, Cluster cluster, Topologies topologies) {
         _slots = new ArrayList<WorkerSlot>();
         _nodeId = nodeId;
         _isAlive = isAlive;
@@ -72,6 +73,7 @@ public class RAS_Node {
             _slots.addAll(_freeSlots);
         }
         this._cluster = cluster;
+        this._topologies = topologies;
     }
 
     public String getId() {
@@ -185,6 +187,8 @@ public class RAS_Node {
         }
         for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
             _cluster.freeSlots(entry.getValue());
+            _availCPU = this.getTotalCpuResources();
+            _availMemory = this.getAvailableMemoryResources();
             if (_isAlive) {
                 _freeSlots.addAll(entry.getValue());
             }
@@ -197,14 +201,19 @@ public class RAS_Node {
      * @param ws the slot to free
      */
     public void free(WorkerSlot ws) {
+        LOG.info("freeing ws {} on node {}", ws, _hostname);
         if (_freeSlots.contains(ws)) return;
         for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
             Set<WorkerSlot> slots = entry.getValue();
+            double memUsed = this.getMemoryUsedByWorker(ws);
+            double cpuUsed = this.getCpuUsedByWorker(ws);
             if (slots.remove(ws)) {
                 _cluster.freeSlot(ws);
                 if (_isAlive) {
                     _freeSlots.add(ws);
                 }
+                this.freeMemory(memUsed);
+                this.freeCPU(cpuUsed);
                 return;
             }
         }
@@ -223,6 +232,8 @@ public class RAS_Node {
         }
         for (WorkerSlot ws : slots) {
             _cluster.freeSlot(ws);
+            this.freeMemory(this.getMemoryUsedByWorker(ws));
+            this.freeCPU(this.getCpuUsedByWorker(ws));
             if (_isAlive) {
                 _freeSlots.add(ws);
             }
@@ -230,6 +241,66 @@ public class RAS_Node {
         _topIdToUsedSlots.remove(topId);
     }
 
+    public void freeMemory(double amount) {
+        _availMemory += amount;
+        LOG.info("freeing {} memory...avail mem: {}", amount, _availMemory);
+        if(_availMemory > this.getTotalMemoryResources()) {
+            LOG.warn("Freeing more memory than there exists!");
+        }
+    }
+
+    public void freeCPU(double amount) {
+        _availCPU += amount;
+        LOG.info("freeing {} CPU...avail CPU: {}", amount, _availCPU);
+        if(_availCPU > this.getAvailableCpuResources()) {
+            LOG.warn("Freeing more memory than there exists!");
+        }
+    }
+
+    public double getMemoryUsedByWorker(WorkerSlot ws) {
+        TopologyDetails topo = this.findTopologyUsingWorker(ws);
+        LOG.info("Topology {} using worker {}", topo, ws);
+        if(topo == null) {
+            return 0.0;
+        }
+        Collection<ExecutorDetails> execs = this.getExecutors(ws, this._cluster);
+        LOG.info("Worker {} has execs: {}", ws, execs);
+        double totalMemoryUsed = 0.0;
+        for(ExecutorDetails exec : execs) {
+            totalMemoryUsed += topo.getTotalMemReqTask(exec);
+        }
+        return totalMemoryUsed;
+    }
+
+    public double getCpuUsedByWorker(WorkerSlot ws) {
+        TopologyDetails topo = this.findTopologyUsingWorker(ws);
+        LOG.info("Topology {} using worker {}", topo, ws);
+        if(topo == null) {
+            return 0.0;
+        }
+        Collection<ExecutorDetails> execs = this.getExecutors(ws, this._cluster);
+        LOG.info("Worker {} has execs: {}", ws, execs);
+        double totalCpuUsed = 0.0;
+        for(ExecutorDetails exec : execs) {
+            totalCpuUsed += topo.getTotalCpuReqTask(exec);
+        }
+        return totalCpuUsed;
+    }
+
+    public TopologyDetails findTopologyUsingWorker(WorkerSlot ws) {
+        for(Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
+            LOG.info("topoId: {} workers: {}", entry.getKey(), entry.getValue());
+            String topoId = entry.getKey();
+            Set<WorkerSlot> workers = entry.getValue();
+            for (WorkerSlot worker : workers) {
+                if(worker.getNodeId().equals(ws.getNodeId()) && worker.getPort() == ws.getPort()) {
+                    return _topologies.getById(topoId);
+                }
+            }
+        }
+        return null;
+    }
+
     /**
      * Allocate Mem and CPU resources to the assigned slot for the topology's executors.
      * @param td the TopologyDetails that the slot is assigned to.
@@ -457,4 +528,8 @@ public class RAS_Node {
         this.consumeCPU(taskCpuReq);
         this.consumeMemory(taskMemReq);
     }
+
+    public Map<String, Set<WorkerSlot>> getTopoIdTousedSlots() {
+        return _topIdToUsedSlots;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
index 9df1475..42fc236 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Nodes.java
@@ -56,7 +56,7 @@ public class RAS_Nodes {
             LOG.debug("Found a {} Node {} {}",
                     isAlive ? "living" : "dead", id, sup.getAllPorts());
             LOG.debug("resources_mem: {}, resources_CPU: {}", sup.getTotalMemory(), sup.getTotalCPU());
-            nodeIdToNode.put(sup.getId(), new RAS_Node(id, sup.getAllPorts(), isAlive, sup, cluster));
+            nodeIdToNode.put(sup.getId(), new RAS_Node(id, sup.getAllPorts(), isAlive, sup, cluster, topologies));
         }
         for (Map.Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
             String topId = entry.getValue().getTopologyId();
@@ -66,7 +66,7 @@ public class RAS_Nodes {
                 if (node == null) {
                     LOG.info("Found an assigned slot on a dead supervisor {} with executors {}",
                             workerSlot, RAS_Node.getExecutors(workerSlot, cluster));
-                    node = new RAS_Node(id, null, false, null, cluster);
+                    node = new RAS_Node(id, null, false, null, cluster, topologies);
                     nodeIdToNode.put(id, node);
                 }
                 if (!node.isAlive()) {
@@ -86,8 +86,9 @@ public class RAS_Nodes {
     /**
      * updates the available resources for every node in a cluster
      * by recalculating memory requirements.
-     * @param cluster the cluster used in this calculation
-     * @param topologies container of all topologies
+     *
+     * @param cluster      the cluster used in this calculation
+     * @param topologies   container of all topologies
      * @param nodeIdToNode a map between node id and node
      */
     private static void updateAvailableResources(Cluster cluster,
@@ -138,12 +139,26 @@ public class RAS_Nodes {
     }
 
     public void freeSlots(Collection<WorkerSlot> workerSlots) {
-        for(RAS_Node node : nodeMap.values()) {
-            for(WorkerSlot ws : node.getUsedSlots()) {
-                if(workerSlots.contains(ws)) {
+        for (RAS_Node node : nodeMap.values()) {
+            for (WorkerSlot ws : node.getUsedSlots()) {
+                if (workerSlots.contains(ws)) {
+                    LOG.info("freeing ws {} on node {}", ws, node);
                     node.free(ws);
                 }
             }
         }
     }
+
+    public Collection<RAS_Node> getNodes() {
+        return this.nodeMap.values();
+    }
+
+    @Override
+    public String toString() {
+        String ret = "";
+        for (RAS_Node node : this.nodeMap.values()) {
+            ret += node.toString() + "\n";
+        }
+        return ret;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index 279d060..934858e 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -45,6 +45,27 @@ public class ResourceAwareScheduler implements IScheduler {
     private Topologies topologies;
     private RAS_Nodes nodes;
 
+    private class SchedulingState {
+        private Map<String, User> userMap = new HashMap<String, User>();
+        private Cluster cluster;
+        private Topologies topologies;
+        private RAS_Nodes nodes;
+        private Map conf = new Config();
+
+        public SchedulingState(Map<String, User> userMap, Cluster cluster, Topologies topologies, RAS_Nodes nodes, Map conf) {
+            for(Map.Entry<String, User> userMapEntry : userMap.entrySet()) {
+                String userId = userMapEntry.getKey();
+                User user = userMapEntry.getValue();
+                this.userMap.put(userId, user.getCopy());
+            }
+            this.cluster = cluster.getCopy();
+            this.topologies = topologies.getCopy();
+            this.nodes = new RAS_Nodes(this.cluster, this.topologies);
+            this.conf.putAll(conf);
+
+        }
+    }
+
 
     @SuppressWarnings("rawtypes")
     private Map conf;
@@ -67,27 +88,35 @@ public class ResourceAwareScheduler implements IScheduler {
         this.initialize(topologies, cluster);
 
         LOG.info("UserMap:\n{}", this.userMap);
-        for(User user : this.getUserMap().values()) {
+        for (User user : this.getUserMap().values()) {
             LOG.info(user.getDetailedInfo());
         }
 
-        for(TopologyDetails topo : topologies.getTopologies()) {
+        for (TopologyDetails topo : topologies.getTopologies()) {
             LOG.info("topo {} status: {}", topo, cluster.getStatusMap().get(topo.getId()));
         }
 
+        LOG.info("Nodes:\n{}", this.nodes);
+
         LOG.info("getNextUser: {}", this.getNextUser());
 
-        while(true) {
+        while (true) {
+            LOG.info("/*********** next scheduling iteration **************/");
+
             User nextUser = this.getNextUser();
-            if(nextUser == null){
+            if (nextUser == null) {
                 break;
             }
             TopologyDetails td = nextUser.getNextTopologyToSchedule();
             scheduleTopology(td);
         }
+        //since scheduling state might have been restored thus need to set the cluster and topologies.
+        cluster = this.cluster;
+        topologies = this.topologies;
     }
 
     private boolean makeSpaceForTopo(TopologyDetails td) {
+        LOG.info("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
         User submitter = this.userMap.get(td.getTopologySubmitter());
         if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null) {
             return false;
@@ -97,7 +126,7 @@ public class ResourceAwareScheduler implements IScheduler {
         double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
 
         //user has enough resource under his or her resource guarantee to schedule topology
-        if ((1.0 - submitter.getCPUResourcePoolUtilization()) > cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) > memoryNeeded) {
+        if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
             User evictUser = this.findUserWithMostResourcesAboveGuarantee();
             if (evictUser == null) {
                 LOG.info("Cannot make space for topology {} from user {}", td.getName(), submitter.getId());
@@ -109,6 +138,8 @@ public class ResourceAwareScheduler implements IScheduler {
             LOG.info("topology to evict: {}", topologyEvict);
             evictTopology(topologyEvict);
 
+            LOG.info("Resources After eviction:\n{}", this.nodes);
+
             return true;
         } else {
 
@@ -129,6 +160,7 @@ public class ResourceAwareScheduler implements IScheduler {
         User submitter = this.userMap.get(topologyEvict.getTopologySubmitter());
 
         LOG.info("Evicting Topology {} with workers: {}", topologyEvict.getName(), workersToEvict);
+        LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
         this.nodes.freeSlots(workersToEvict);
         submitter.moveTopoFromRunningToPending(topologyEvict, this.cluster);
         LOG.info("check if topology unassigned: {}", this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId()));
@@ -137,9 +169,9 @@ public class ResourceAwareScheduler implements IScheduler {
     private User findUserWithMostResourcesAboveGuarantee() {
         double most = 0.0;
         User mostOverUser = null;
-        for(User user : this.userMap.values()) {
-            double over = user.getResourcePoolAverageUtilization() -1.0;
-            if((over > most) && (!user.getTopologiesRunning().isEmpty())) {
+        for (User user : this.userMap.values()) {
+            double over = user.getResourcePoolAverageUtilization() - 1.0;
+            if ((over > most) && (!user.getTopologiesRunning().isEmpty())) {
                 most = over;
                 mostOverUser = user;
             }
@@ -147,63 +179,74 @@ public class ResourceAwareScheduler implements IScheduler {
         return mostOverUser;
     }
 
-    public void resetAssignments(Map<String, SchedulerAssignment> assignmentCheckpoint) {
-        this.cluster.setAssignments(assignmentCheckpoint);
-    }
-
     public void scheduleTopology(TopologyDetails td) {
-        ResourceAwareStrategy RAStrategy = new ResourceAwareStrategy(this.cluster, this.topologies);
         User topologySubmitter = this.userMap.get(td.getTopologySubmitter());
         if (cluster.getUnassignedExecutors(td).size() > 0) {
             LOG.info("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
             LOG.info("{}", this.userMap.get(td.getTopologySubmitter()).getDetailedInfo());
             LOG.info("{}", User.getResourcePoolAverageUtilizationForUsers(this.userMap.values()));
+            LOG.info("Nodes:\n{}", this.nodes);
+            LOG.debug("From cluster:\n{}", ResourceUtils.printScheduling(this.cluster, this.topologies));
+            LOG.debug("From Nodes:\n{}", ResourceUtils.printScheduling(this.nodes));
 
-            Map<String, SchedulerAssignment> assignmentCheckpoint = this.cluster.getAssignments();
-
+            SchedulingState schedulingState = this.checkpointSchedulingState();
             while (true) {
+                //Need to reinitialize ResourceAwareStrategy with cluster and topologies in case scheduling state was restored
+                ResourceAwareStrategy RAStrategy = new ResourceAwareStrategy(this.cluster, this.topologies);
                 SchedulingResult result = RAStrategy.schedule(td);
                 LOG.info("scheduling result: {}", result);
                 if (result.isValid()) {
                     if (result.isSuccess()) {
                         try {
-                            if(mkAssignment(td, result.getSchedulingResultMap())) {
+                            if (mkAssignment(td, result.getSchedulingResultMap())) {
                                 topologySubmitter.moveTopoFromPendingToRunning(td, this.cluster);
                             } else {
-                                resetAssignments(assignmentCheckpoint);
+                                //resetAssignments(assignmentCheckpoint);
+                                this.restoreCheckpointSchedulingState(schedulingState);
+                                topologySubmitter = this.userMap.get(td.getTopologySubmitter());
                                 topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
                             }
                         } catch (IllegalStateException ex) {
                             LOG.error(ex.toString());
-                            LOG.error("Unsuccessful in scheduling", td.getId());
+                            LOG.error("Unsuccessful in scheduling: IllegalStateException thrown!", td.getId());
                             this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
-                            resetAssignments(assignmentCheckpoint);
+                            this.restoreCheckpointSchedulingState(schedulingState);
+                            //since state is restored need the update User topologySubmitter to the new User object in userMap
+                            topologySubmitter = this.userMap.get(td.getTopologySubmitter());
                             topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
                         }
                         break;
                     } else {
                         if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
-                            if(!this.makeSpaceForTopo(td)) {
-                                topologySubmitter.moveTopoFromPendingToAttempted(td);
+                            if (!this.makeSpaceForTopo(td)) {
                                 this.cluster.setStatus(td.getId(), result.getErrorMessage());
-                                resetAssignments(assignmentCheckpoint);
+                                this.restoreCheckpointSchedulingState(schedulingState);
+                                //since state is restored need the update User topologySubmitter to the new User object in userMap
+                                topologySubmitter = this.userMap.get(td.getTopologySubmitter());
+                                topologySubmitter.moveTopoFromPendingToAttempted(td);
                                 break;
                             }
                             continue;
                         } else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
+                            this.restoreCheckpointSchedulingState(schedulingState);
+                            //since state is restored need the update User topologySubmitter to the new User object in userMap
+                            topologySubmitter = this.userMap.get(td.getTopologySubmitter());
                             topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
-                            resetAssignments(assignmentCheckpoint);
                             break;
                         } else {
+                            this.restoreCheckpointSchedulingState(schedulingState);
+                            //since state is restored need the update User topologySubmitter to the new User object in userMap
+                            topologySubmitter = this.userMap.get(td.getTopologySubmitter());
                             topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
-                            resetAssignments(assignmentCheckpoint);
                             break;
                         }
                     }
                 } else {
                     LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", td.getName());
+                    this.restoreCheckpointSchedulingState(schedulingState);
+                    //since state is restored need the update User topologySubmitter to the new User object in userMap
+                    topologySubmitter = this.userMap.get(td.getTopologySubmitter());
                     topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
-                    resetAssignments(assignmentCheckpoint);
                     break;
                 }
             }
@@ -215,6 +258,7 @@ public class ResourceAwareScheduler implements IScheduler {
     }
 
     private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) {
+        LOG.info("making assignments for topology {}", td);
         if (schedulerAssignmentMap != null) {
             double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
             double requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
@@ -272,69 +316,6 @@ public class ResourceAwareScheduler implements IScheduler {
         cluster.setSupervisorsResourcesMap(supervisors_resources);
     }
 
-
-//    private void scheduleTopology(TopologyDetails td) {
-//        ResourceAwareStrategy RAStrategy = new ResourceAwareStrategy(this.cluster, this.topologies);
-//        if (cluster.needsScheduling(td) && cluster.getUnassignedExecutors(td).size() > 0) {
-//            LOG.info("/********Scheduling topology {} from User {}************/", td.getName(), td.getTopologySubmitter());
-//            LOG.info("{}", this.userMap.get(td.getTopologySubmitter()).getDetailedInfo());
-//            LOG.info("{}", User.getResourcePoolAverageUtilizationForUsers(this.userMap.values()));
-//
-//            Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = RAStrategy.schedule(td);
-//
-//            double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
-//            double requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
-//            double requestedCpu = td.getTotalRequestedCpu();
-//            double assignedMemOnHeap = 0.0;
-//            double assignedMemOffHeap = 0.0;
-//            double assignedCpu = 0.0;
-//
-//            if (schedulerAssignmentMap != null) {
-//                try {
-//                    Set<String> nodesUsed = new HashSet<String>();
-//                    int assignedWorkers = schedulerAssignmentMap.keySet().size();
-//                    for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
-//                        WorkerSlot targetSlot = workerToTasksEntry.getKey();
-//                        Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
-//                        RAS_Node targetNode = RAStrategy.idToNode(targetSlot.getNodeId());
-//                        targetNode.assign(targetSlot, td, execsNeedScheduling, this.cluster);
-//                        LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: {} To Node: {} on Slot: {}",
-//                                td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort());
-//                        if (!nodesUsed.contains(targetNode.getId())) {
-//                            nodesUsed.add(targetNode.getId());
-//                        }
-//                        assignedMemOnHeap += targetSlot.getAllocatedMemOnHeap();
-//                        assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap();
-//                        assignedCpu += targetSlot.getAllocatedCpu();
-//                    }
-//                    LOG.debug("Topology: {} assigned to {} nodes on {} workers", td.getId(), nodesUsed.size(), assignedWorkers);
-//                    this.cluster.setStatus(td.getId(), "Fully Scheduled");
-//                    this.getUser(td.getTopologySubmitter()).moveTopoFromPendingToRunning(td);
-//                    LOG.info("getNextUser: {}", this.getNextUser());
-//                } catch (IllegalStateException ex) {
-//                    LOG.error(ex.toString());
-//                    LOG.error("Unsuccessful in scheduling", td.getId());
-//                    this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
-//                    this.getUser(td.getTopologySubmitter()).moveTopoFromPendingToAttempted(td);
-//                }
-//            } else {
-//                LOG.error("Unsuccessful in scheduling {}", td.getId());
-//                this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
-//              //  this.evictTopology(td);
-//               // this.getUser(td.getTopologySubmitter()).moveTopoFromPendingToAttempted(td);
-//            }
-//            Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
-//                    assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
-//            LOG.debug("setResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " +
-//                            "assigned on-heap mem, off-heap mem, cpu: {} {} {}",
-//                    td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
-//                    assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
-//            this.cluster.setResources(td.getId(), resources);
-//        } else {
-//            LOG.warn("Topology {} already scheduled!", td.getName());
-//            this.cluster.setStatus(td.getId(), "Fully Scheduled");
-//        }
-//    }
     public User getUser(String user) {
         return this.userMap.get(user);
     }
@@ -346,23 +327,30 @@ public class ResourceAwareScheduler implements IScheduler {
     public User getNextUser() {
         Double least = Double.POSITIVE_INFINITY;
         User ret = null;
-        for(User user : this.userMap.values()) {
-            LOG.info("{}", user.getDetailedInfo());
+        for (User user : this.userMap.values()) {
+            LOG.info("getNextUser {}", user.getDetailedInfo());
             LOG.info("hasTopologyNeedSchedule: {}", user.hasTopologyNeedSchedule());
-            if(user.hasTopologyNeedSchedule()) {
+            if (user.hasTopologyNeedSchedule()) {
                 Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
+                if(ret!=null) {
+                    LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), least, user.getId(), userResourcePoolAverageUtilization);
+                    LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, least == userResourcePoolAverageUtilization);
+                    LOG.info("{} == {}: {}", least, userResourcePoolAverageUtilization, (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001));
+
+                }
                 if (least > userResourcePoolAverageUtilization) {
                     ret = user;
                     least = userResourcePoolAverageUtilization;
-                } else if (least == userResourcePoolAverageUtilization) {
-                    double currentCpuPercentage = ret.getCPUResourceGuaranteed()/this.cluster.getClusterTotalCPUResource();
-                    double currentMemoryPercentage = ret.getMemoryResourceGuaranteed()/this.cluster.getClusterTotalMemoryResource();
+                } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
+                    double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
+                    double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
                     double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
 
-                    double userCpuPercentage = user.getCPUResourceGuaranteed()/this.cluster.getClusterTotalCPUResource();
-                    double userMemoryPercentage = user.getMemoryResourceGuaranteed()/this.cluster.getClusterTotalMemoryResource();
+                    double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
+                    double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
                     double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
-                    if(userAvgPercentage > currentAvgPercentage) {
+                    LOG.info("current: {}-{} compareUser: {}-{}", ret.getId(), currentAvgPercentage, user.getId(), userAvgPercentage);
+                    if (userAvgPercentage > currentAvgPercentage) {
                         ret = user;
                         least = userResourcePoolAverageUtilization;
                     }
@@ -374,6 +362,7 @@ public class ResourceAwareScheduler implements IScheduler {
 
     /**
      * Intialize scheduling and running queues
+     *
      * @param topologies
      * @param cluster
      */
@@ -384,18 +373,16 @@ public class ResourceAwareScheduler implements IScheduler {
         LOG.info("userResourcePools: {}", userResourcePools);
 
         for (TopologyDetails td : topologies.getTopologies()) {
-            LOG.info("topology: {} from {}", td.getName(), td.getTopologySubmitter());
             String topologySubmitter = td.getTopologySubmitter();
-            if(topologySubmitter == null) {
+            if (topologySubmitter == null) {
                 LOG.warn("Topology {} submitted by anonymous user", td.getName());
                 topologySubmitter = "anonymous";
             }
-            if(!this.userMap.containsKey(topologySubmitter)) {
+            if (!this.userMap.containsKey(topologySubmitter)) {
                 this.userMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
             }
-            if(cluster.getUnassignedExecutors(td).size() >= td.getExecutors().size()) {
+            if (cluster.getUnassignedExecutors(td).size() >= td.getExecutors().size()) {
                 this.userMap.get(topologySubmitter).addTopologyToPendingQueue(td, cluster);
-                LOG.info(this.userMap.get(topologySubmitter).getDetailedInfo());
             } else {
                 this.userMap.get(topologySubmitter).addTopologyToRunningQueue(td, cluster);
             }
@@ -411,35 +398,62 @@ public class ResourceAwareScheduler implements IScheduler {
 
     /**
      * Get resource guarantee configs
+     *
      * @return
      */
     private Map<String, Map<String, Double>> getUserResourcePools() {
         Object raw = this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
-        Map<String, Map<String, Double>> ret =  (Map<String, Map<String, Double>>)this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+        Map<String, Map<String, Double>> ret = new HashMap<String, Map<String, Double>>();
 
-        if (raw == null) {
-            ret = new HashMap<String, Map<String, Double>>();
-        } else {
-            for(Map.Entry<String, Map<String, Number>> UserPoolEntry : ((Map<String, Map<String, Number>>) raw).entrySet()) {
+        if (raw != null) {
+            for (Map.Entry<String, Map<String, Number>> UserPoolEntry : ((Map<String, Map<String, Number>>) raw).entrySet()) {
                 String user = UserPoolEntry.getKey();
                 ret.put(user, new HashMap<String, Double>());
-                for(Map.Entry<String, Number> resourceEntry : UserPoolEntry.getValue().entrySet()) {
+                for (Map.Entry<String, Number> resourceEntry : UserPoolEntry.getValue().entrySet()) {
                     ret.get(user).put(resourceEntry.getKey(), resourceEntry.getValue().doubleValue());
                 }
             }
         }
 
         Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
-        Map<String, Map<String, Number>>tmp = (Map<String, Map<String, Number>>)fromFile.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+        Map<String, Map<String, Number>> tmp = (Map<String, Map<String, Number>>) fromFile.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
         if (tmp != null) {
-            for(Map.Entry<String, Map<String, Number>> UserPoolEntry : tmp.entrySet()) {
+            for (Map.Entry<String, Map<String, Number>> UserPoolEntry : tmp.entrySet()) {
                 String user = UserPoolEntry.getKey();
                 ret.put(user, new HashMap<String, Double>());
-                for(Map.Entry<String, Number> resourceEntry : UserPoolEntry.getValue().entrySet()) {
+                for (Map.Entry<String, Number> resourceEntry : UserPoolEntry.getValue().entrySet()) {
                     ret.get(user).put(resourceEntry.getKey(), resourceEntry.getValue().doubleValue());
                 }
             }
         }
         return ret;
     }
+
+    private SchedulingState checkpointSchedulingState() {
+        LOG.info("checkpointing scheduling state...");
+        LOG.info("/*********Checkpoint************/");
+        for (User user : this.getUserMap().values()) {
+            LOG.info(user.getDetailedInfo());
+        }
+        LOG.info("/*********End************/");
+        return new SchedulingState(this.userMap, this.cluster, this.topologies, this.nodes, this.conf);
+    }
+
+    private void restoreCheckpointSchedulingState(SchedulingState schedulingState) {
+        LOG.info("restoring scheduling state...");
+        LOG.info("/*********Before************/");
+        for (User user : this.getUserMap().values()) {
+            LOG.info(user.getDetailedInfo());
+        }
+        this.cluster = schedulingState.cluster;
+        this.topologies = schedulingState.topologies;
+        this.conf = schedulingState.conf;
+        this.userMap = schedulingState.userMap;
+        this.nodes = schedulingState.nodes;
+        LOG.info("/*********After************/");
+        for (User user : this.getUserMap().values()) {
+            LOG.info(user.getDetailedInfo());
+        }
+        LOG.info("/*********End************/");
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
index 8e11384..02d48e1 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceUtils.java
@@ -181,4 +181,13 @@ public class ResourceUtils {
         }
         return str.toString();
     }
+
+    public static String printScheduling(RAS_Nodes nodes) {
+        String ret="";
+        for (RAS_Node node : nodes.getNodes()) {
+            ret += "Node: " + node.getHostname() + "\n";
+            ret += "-> " + node.getTopoIdTousedSlots() + "\n";
+        }
+        return ret;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
index 068db54..77a1dff 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
@@ -35,15 +35,15 @@ import java.util.TreeSet;
 public class User {
     private String userId;
     //Topologies yet to be scheduled sorted by priority for each user
-    private Set<TopologyDetails> pendingQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+    private TreeSet<TopologyDetails> pendingQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
 
     //Topologies yet to be scheduled sorted by priority for each user
-    private Set<TopologyDetails> runningQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+    private TreeSet<TopologyDetails> runningQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
 
     //Topologies that was attempted to be scheduled but wasn't successull
-    private Set<TopologyDetails> attemptedQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+    private TreeSet<TopologyDetails> attemptedQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
 
-    private Set<TopologyDetails> invalidQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+    private TreeSet<TopologyDetails> invalidQueue = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
 
     private Map<String, Double> resourcePool = new HashMap<String, Double>();
 
@@ -60,6 +60,23 @@ public class User {
         }
     }
 
+    public User getCopy() {
+        User newUser = new User(this.userId, this.resourcePool);
+        for(TopologyDetails topo :  this.pendingQueue) {
+            newUser.addTopologyToPendingQueue(topo);
+        }
+        for(TopologyDetails topo :  this.runningQueue) {
+            newUser.addTopologyToRunningQueue(topo);
+        }
+        for(TopologyDetails topo :  this.attemptedQueue) {
+            newUser.addTopologyToAttemptedQueue(topo);
+        }
+        for(TopologyDetails topo :  this.invalidQueue) {
+            newUser.addTopologyToInvalidQueue(topo);
+        }
+        return newUser;
+    }
+
     public String getId() {
         return this.userId;
     }
@@ -92,6 +109,14 @@ public class User {
         return ret;
     }
 
+    public void addTopologyToAttemptedQueue(TopologyDetails topo) {
+        this.attemptedQueue.add(topo);
+    }
+
+    public void addTopologyToInvalidQueue(TopologyDetails topo) {
+        this.invalidQueue.add(topo);
+    }
+
     public Set<TopologyDetails> getTopologiesRunning() {
         TreeSet<TopologyDetails> ret = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
         ret.addAll(this.runningQueue);
@@ -104,6 +129,11 @@ public class User {
         return ret;
     }
 
+    public Set<TopologyDetails> getTopologiesInvalid() {
+        TreeSet<TopologyDetails> ret = new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+        ret.addAll(this.invalidQueue);
+        return ret;
+    }
     public Map<String, Number> getResourcePool() {
         if (this.resourcePool != null) {
             return new HashMap<String, Number>(this.resourcePool);
@@ -160,17 +190,12 @@ public class User {
 
 
     private void moveTopology(TopologyDetails topo, Set<TopologyDetails> src, String srcName, Set<TopologyDetails> dest, String destName) {
-        LOG.info("{} queue: {}", srcName, src);
-        LOG.info("{} queue: {}", destName, dest);
+        LOG.info("For User {} Moving topo {} from {} to {}", this.userId, topo.getName(), srcName, destName);
         if (topo == null) {
             return;
         }
         if (!src.contains(topo)) {
             LOG.warn("Topo {} not in User: {} {} queue!", topo.getName(), this.userId, srcName);
-            LOG.info("topo {}-{}-{}", topo.getName(), topo.getId(), topo.hashCode());
-            for (TopologyDetails t : src) {
-                LOG.info("queue entry: {}-{}-{}", t.getName(), t.getId(), t.hashCode());
-            }
             return;
         }
         if (dest.contains(topo)) {
@@ -179,6 +204,8 @@ public class User {
         }
         src.remove(topo);
         dest.add(topo);
+        LOG.info("SRC: {}", src);
+        LOG.info("DEST: {}", dest);
     }
 
 
@@ -244,14 +271,15 @@ public class User {
     }
 
     public boolean hasTopologyNeedSchedule() {
-        return (!this.pendingQueue.isEmpty() && (this.pendingQueue.size() - this.attemptedQueue.size()) > 0);
+        //return (!this.pendingQueue.isEmpty() && (this.pendingQueue.size() - this.attemptedQueue.size()) > 0);
+        return (!this.pendingQueue.isEmpty());
     }
 
     public TopologyDetails getRunningTopologyWithLowestPriority() {
         if (this.runningQueue.isEmpty()) {
             return null;
         }
-        return this.runningQueue.iterator().next();
+        return this.runningQueue.last();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
index bea41ff..3ff0af1 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
@@ -29,6 +29,8 @@ import backtype.storm.utils.Time;
 import backtype.storm.utils.Utils;
 import org.junit.Test;
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -37,120 +39,61 @@ import java.util.Map;
  * Created by jerrypeng on 11/11/15.
  */
 public class Experiment {
-    @Test
-    public void TestMultipleUsers() {
-//        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
-//        Map<String, Number> resourceMap = new HashMap<String, Number>();
-//        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 1000.0);
-//        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
-//        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
-//        Config config = new Config();
-//        config.putAll(Utils.readDefaultConfig());
-//        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
-//        resourceUserPool.put("jerry", new HashMap<String, Number>());
-//        resourceUserPool.get("jerry").put("cpu", 1000);
-//        resourceUserPool.get("jerry").put("memory", 8192.0);
-//
-//        resourceUserPool.put("bobby", new HashMap<String, Number>());
-//        resourceUserPool.get("bobby").put("cpu", 10000.0);
-//        resourceUserPool.get("bobby").put("memory", 32768);
-//
-//        resourceUserPool.put("derek", new HashMap<String, Number>());
-//        resourceUserPool.get("derek").put("cpu", 5000.0);
-//        resourceUserPool.get("derek").put("memory", 16384.0);
-//
-//        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
-//        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
-//
-//        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-//
-//        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
-//        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
-//        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
-//        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
-//        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
-//
-//        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-//
-//        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
-//        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
-//        TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
-//        TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
-//        TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
-//
-//        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-//
-//        TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
-//        TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
-//        TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
-//        TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
-//        TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
-//
-//
-//        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
-//        topoMap.put(topo1.getId(), topo1);
-//        topoMap.put(topo2.getId(), topo2);
-//        topoMap.put(topo3.getId(), topo3);
-//        topoMap.put(topo4.getId(), topo4);
-//        topoMap.put(topo5.getId(), topo5);
-//        topoMap.put(topo6.getId(), topo6);
-//        topoMap.put(topo7.getId(), topo7);
-//        topoMap.put(topo8.getId(), topo8);
-//        topoMap.put(topo9.getId(), topo9);
-//        topoMap.put(topo10.getId(), topo10);
-//        topoMap.put(topo11.getId(), topo11);
-//        topoMap.put(topo12.getId(), topo12);
-//        topoMap.put(topo13.getId(), topo13);
-//        topoMap.put(topo14.getId(), topo14);
-//        topoMap.put(topo15.getId(), topo15);
-//
-//        Topologies topologies = new Topologies(topoMap);
-//
-//        ResourceAwareScheduler rs = new ResourceAwareScheduler();
-//
-//        rs.prepare(config);
-//        rs.schedule(topologies, cluster);
-//
-//        for(TopologyDetails topo : topoMap.values()) {
-//            Assert.assertEquals(cluster.getStatusMap().get(topo.getId()), "Fully Scheduled");
-//        }
-//
-//        for(User user : rs.getUserMap().values()) {
-//            Assert.assertEquals(user.getTopologiesPending().size(), 0);
-//            Assert.assertEquals(user.getTopologiesRunning().size(), 5);
-//        }
 
+    private static final Logger LOG = LoggerFactory.getLogger(Experiment.class);
+
+    /**
+     * Eviction order:
+     * topo-3: since user bobby don't have any resource guarantees and topo-3 is the lowest priority for user bobby
+     * topo-2: since user bobby don't have any resource guarantees and topo-2 is the next lowest priority for user bobby
+     * topo-5: since user derek has exceeded his resource guarantee while user jerry has not.  topo-5 and topo-4 has the same priority
+     * but topo-4 was submitted earlier thus we choose that one to evict
+     */
+    @Test
+    public void TestEvictMultipleTopologiesFromMultipleUsersInCorrectOrder() {
         INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
         Map<String, Number> resourceMap = new HashMap<String, Number>();
-        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
-        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
-        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, resourceMap);
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
         Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
         resourceUserPool.put("jerry", new HashMap<String, Number>());
-        resourceUserPool.get("jerry").put("cpu", 1000);
-        resourceUserPool.get("jerry").put("memory", 8192.0);
-
-        resourceUserPool.put("bobby", new HashMap<String, Number>());
-        resourceUserPool.get("bobby").put("cpu", 10000.0);
-        resourceUserPool.get("bobby").put("memory", 32768);
+        resourceUserPool.get("jerry").put("cpu", 300.0);
+        resourceUserPool.get("jerry").put("memory", 3000.0);
 
         resourceUserPool.put("derek", new HashMap<String, Number>());
-        resourceUserPool.get("derek").put("cpu", 5000.0);
-        resourceUserPool.get("derek").put("memory", 16384.0);
+        resourceUserPool.get("derek").put("cpu", 100.0);
+        resourceUserPool.get("derek").put("memory", 1000.0);
 
         config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
         Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
 
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 15, 30);
 
         Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
-        topoMap.put(topo1.getId(), topo1);
         topoMap.put(topo2.getId(), topo2);
+        topoMap.put(topo3.getId(), topo3);
+        topoMap.put(topo4.getId(), topo4);
+        topoMap.put(topo5.getId(), topo5);
 
         Topologies topologies = new Topologies(topoMap);
 
@@ -159,16 +102,120 @@ public class Experiment {
         rs.prepare(config);
         rs.schedule(topologies, cluster);
 
-        int fullyScheduled = 0;
-        for (TopologyDetails topo : topoMap.values()) {
-            if(cluster.getStatusMap().get(topo.getId()).equals("Fully Scheduled")) {
-                fullyScheduled++;
-            }
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+
+        //user jerry submits another topology
+        topoMap.put(topo1.getId(), topo1);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+            Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("correct topology to evict", rs.getUser("bobby").getTopologiesAttempted().iterator().next().getName(), "topo-3");
+
+        topoMap.put(topo6.getId(), topo6);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
-        Assert.assertEquals("# of Fully scheduled", 1, fullyScheduled);
-        Assert.assertEquals("# of topologies schedule attempted", 1, rs.getUser("jerry").getTopologiesAttempted().size());
-        Assert.assertEquals("# of topologies running", 1, rs.getUser("jerry").getTopologiesRunning().size());
-        Assert.assertEquals("# of topologies schedule pending", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+            Assert.assertFalse(TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
+
+        Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
+        Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
+
+        topoMap.put(topo7.getId(), topo7);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 3, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesAttempted()) {
+            Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 1, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+        Assert.assertEquals("correct topology to evict", rs.getUser("derek").getTopologiesAttempted().iterator().next().getName(), "topo-4");
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+            Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
+
+        Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
+        Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
     }