You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:37 UTC

[09/23] storm git commit: first initial implementation

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
index 4fff405..3f2610c 100644
--- a/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
+++ b/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
@@ -49,6 +49,9 @@
       (into {}
         (for [at (range start end)]
           {(ed at) name})))))
+(def DEFAULT_PRIORITY_STRATEGY "backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy")
+(def DEFAULT_EVICTION_STRATEGY "backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy")
+(def DEFAULT_SCHEDULING_STRATEGY "backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy")
 
 ;; get the super->mem HashMap by counting the eds' mem usage of all topos on each super
 (defn get-super->mem-usage [^Cluster cluster ^Topologies topologies]
@@ -154,7 +157,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0}
+                     TOPOLOGY-PRIORITY 0
+                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology
                     1
                     (mk-ed-map [["wordSpout" 0 1]
@@ -165,7 +169,8 @@
         topologies (Topologies. (to-top-map [topology1]))
         node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
         scheduler (ResourceAwareScheduler.)]
-    (.prepare scheduler {})
+    (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                         RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
     (.schedule scheduler topologies cluster)
     (let [assignment (.getAssignmentById cluster "topology1")
           assigned-slots (.getSlots assignment)
@@ -173,7 +178,7 @@
       (is (= 1 (.size assigned-slots)))
       (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
       (is (= 2 (.size executors))))
-    (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))))
+    (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))))
 
 (deftest test-topology-with-multiple-spouts
   (let [builder1 (TopologyBuilder.)  ;; a topology with multiple spouts
@@ -195,7 +200,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0}
+                     TOPOLOGY-PRIORITY 0
+                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology1
                     1
                     (mk-ed-map [["wordSpout1" 0 1]
@@ -216,7 +222,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0}
+                     TOPOLOGY-PRIORITY 0
+                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology2
                     1
                     (mk-ed-map [["wordSpoutX" 0 1]
@@ -227,7 +234,8 @@
                    "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
         topologies (Topologies. (to-top-map [topology1 topology2]))
         scheduler (ResourceAwareScheduler.)]
-    (.prepare scheduler {})
+    (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                         RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
     (.schedule scheduler topologies cluster)
     (let [assignment (.getAssignmentById cluster "topology1")
           assigned-slots (.getSlots assignment)
@@ -235,14 +243,14 @@
       (is (= 1 (.size assigned-slots)))
       (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
       (is (= 7 (.size executors))))
-    (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+    (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
     (let [assignment (.getAssignmentById cluster "topology2")
           assigned-slots (.getSlots assignment)
           executors (.getExecutors assignment)]
       (is (= 1 (.size assigned-slots)))
       (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
       (is (= 2 (.size executors))))
-    (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))))
+    (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
 
 (deftest test-topology-set-memory-and-cpu-load
   (let [builder (TopologyBuilder.)
@@ -261,7 +269,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0}
+                     TOPOLOGY-PRIORITY 0
+                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology
                     2
                     (mk-ed-map [["wordSpout" 0 1]
@@ -271,7 +280,8 @@
                    "backtype.storm.testing.AlternateRackDNSToSwitchMapping"})
         topologies (Topologies. (to-top-map [topology2]))
         scheduler (ResourceAwareScheduler.)]
-    (.prepare scheduler {})
+    (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                         RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
     (.schedule scheduler topologies cluster)
     (let [assignment (.getAssignmentById cluster "topology2")
           assigned-slots (.getSlots assignment)
@@ -280,7 +290,7 @@
       (is (= 1 (.size assigned-slots)))
       (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
       (is (= 2 (.size executors))))
-    (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))))
+    (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
 
 (deftest test-resource-limitation
   (let [builder (TopologyBuilder.)
@@ -300,7 +310,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0}
+                     TOPOLOGY-PRIORITY 0
+                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology
                     2 ;; need two workers, each on one node
                     (mk-ed-map [["wordSpout" 0 2]
@@ -310,7 +321,8 @@
                    "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
         topologies (Topologies. (to-top-map [topology1]))
         scheduler (ResourceAwareScheduler.)]
-    (.prepare scheduler {})
+    (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                         RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
     (.schedule scheduler topologies cluster)
     (let [assignment (.getAssignmentById cluster "topology1")
           assigned-slots (.getSlots assignment)
@@ -344,7 +356,7 @@
       (is (>= avail used)))
     (doseq [[avail used] cpu-avail->used] ;; for each node, assigned cpu smaller than total
       (is (>= avail used))))
-  (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))))
+  (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))))
 
 (deftest test-scheduling-resilience
   (let [supers (gen-supervisors 2 2)
@@ -358,7 +370,8 @@
                       TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                       TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                       TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                      TOPOLOGY-PRIORITY 0}
+                      TOPOLOGY-PRIORITY 0
+                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                      storm-topology1
                      3 ;; three workers to hold three executors
                      (mk-ed-map [["spout1" 0 3]]))
@@ -372,7 +385,8 @@
                       TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                       TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                       TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                      TOPOLOGY-PRIORITY 0}
+                      TOPOLOGY-PRIORITY 0
+                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                      storm-topology2
                      2  ;; two workers, each holds one executor and resides on one node
                      (mk-ed-map [["spout2" 0 2]]))
@@ -383,7 +397,8 @@
                               {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                                "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
             topologies (Topologies. (to-top-map [topology2]))
-            _ (.prepare scheduler {})
+            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
             _ (.schedule scheduler topologies cluster)
             assignment (.getAssignmentById cluster "topology2")
             failed-worker (first (vec (.getSlots assignment)))  ;; choose a worker to mock as failed
@@ -392,14 +407,15 @@
             _ (doseq [ed failed-eds] (.remove ed->slot ed))  ;; remove executor details assigned to the worker
             copy-old-mapping (HashMap. ed->slot)
             healthy-eds (.keySet copy-old-mapping)
-            _ (.prepare scheduler {})
+            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
             _ (.schedule scheduler topologies cluster)
             new-assignment (.getAssignmentById cluster "topology2")
             new-ed->slot (.getExecutorToSlot new-assignment)]
         ;; for each executor that was scheduled on healthy workers, their slots should remain unchanged after a new scheduling
         (doseq [ed healthy-eds]
           (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
-        (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))))
+        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
 
     (testing "When a supervisor fails, RAS does not alter existing assignments"
       (let [existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1"
@@ -419,7 +435,8 @@
                                   (.getAssignments cluster)
                                   {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                                    "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-            _ (.prepare scheduler {})
+            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
             _ (.schedule scheduler topologies new-cluster) ;; the actual schedule for this topo will not run since it is fully assigned
             new-assignment (.getAssignmentById new-cluster "topology1")
             new-ed->slot (.getExecutorToSlot new-assignment)]
@@ -446,33 +463,36 @@
                                   (.getAssignments cluster)
                                   {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                                    "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-            _ (.prepare scheduler {})
+            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
             _ (.schedule scheduler topologies new-cluster)
             new-assignment (.getAssignmentById new-cluster "topology1")
             new-ed->slot (.getExecutorToSlot new-assignment)]
         (doseq [ed existing-eds]
           (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
-        (is (= "Fully Scheduled" (.get (.getStatusMap new-cluster) "topology1")))))
+        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap new-cluster) "topology1")))))
 
     (testing "Scheduling a new topology does not disturb other assignments unnecessarily"
       (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
                               {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                                "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
             topologies (Topologies. (to-top-map [topology1]))
-            _ (.prepare scheduler {})
+            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
             _ (.schedule scheduler topologies cluster)
             assignment (.getAssignmentById cluster "topology1")
             ed->slot (.getExecutorToSlot assignment)
             copy-old-mapping (HashMap. ed->slot)
             new-topologies (Topologies. (to-top-map [topology1 topology2]))  ;; a second topology joins
-            _ (.prepare scheduler {})
+            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
             _ (.schedule scheduler new-topologies cluster)
             new-assignment (.getAssignmentById cluster "topology1")
             new-ed->slot (.getExecutorToSlot new-assignment)]
         (doseq [ed (.keySet copy-old-mapping)]
           (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))  ;; the assignment for topo1 should not change
-        (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
-        (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))))))
+        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
+        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))))
 
 ;; Automated tests for heterogeneous cluster
 (deftest test-heterogeneous-cluster
@@ -497,7 +517,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0}
+                     TOPOLOGY-PRIORITY 0
+                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology1
                     1
                     (mk-ed-map [["spout1" 0 1]]))
@@ -513,7 +534,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0}
+                     TOPOLOGY-PRIORITY 0
+                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology2
                     2
                     (mk-ed-map [["spout2" 0 4]]))
@@ -529,7 +551,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0}
+                     TOPOLOGY-PRIORITY 0
+                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology3
                     2
                     (mk-ed-map [["spout3" 0 4]]))
@@ -545,7 +568,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0}
+                     TOPOLOGY-PRIORITY 0
+                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology4
                     2
                     (mk-ed-map [["spout4" 0 12]]))
@@ -561,7 +585,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0}
+                     TOPOLOGY-PRIORITY 0
+                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                     storm-topology5
                     2
                     (mk-ed-map [["spout5" 0 40]]))
@@ -574,13 +599,14 @@
                                "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
             topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
             scheduler (ResourceAwareScheduler.)
-            _ (.prepare scheduler {})
+            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
             _ (.schedule scheduler topologies cluster)
             super->mem-usage (get-super->mem-usage cluster topologies)
             super->cpu-usage (get-super->cpu-usage cluster topologies)]
-        (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
-        (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))
-        (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology3")))
+        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
+        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))
+        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology3")))
         (doseq [super (.values supers)]
           (let [mem-avail (.getTotalMemory super)
                 mem-used (.get super->mem-usage super)
@@ -595,11 +621,12 @@
                                "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
             topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
             scheduler (ResourceAwareScheduler.)
-            _ (.prepare scheduler {})
+            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
             _ (.schedule scheduler topologies cluster)
-                scheduled-topos (if (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")) 1 0)
-                scheduled-topos (+ scheduled-topos (if (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")) 1 0))
-                scheduled-topos (+ scheduled-topos (if (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology4")) 1 0))]
+                scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")) 1 0)
+                scheduled-topos (+ scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")) 1 0))
+                scheduled-topos (+ scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology4")) 1 0))]
             (is (= scheduled-topos 2)))) ;; only 2 topos will get (fully) scheduled
 
     (testing "Launch topo5 only, both mem and cpu should be exactly used up"
@@ -608,11 +635,12 @@
                                "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
             topologies (Topologies. (to-top-map [topology5]))
             scheduler (ResourceAwareScheduler.)
-            _ (.prepare scheduler {})
+            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
             _ (.schedule scheduler topologies cluster)
             super->mem-usage (get-super->mem-usage cluster topologies)
             super->cpu-usage (get-super->cpu-usage cluster topologies)]
-        (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology5")))
+        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology5")))
         (doseq [super (.values supers)]
           (let [mem-avail (.getTotalMemory super)
                 mem-used (.get super->mem-usage super)
@@ -638,14 +666,16 @@
                        TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                        TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                        TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
-                       TOPOLOGY-PRIORITY 0}
+                       TOPOLOGY-PRIORITY 0
+                       TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                       storm-topology1
                       1
                       (mk-ed-map [["spout1" 0 4]]))
           topologies (Topologies. (to-top-map [topology1]))]
-      (.prepare scheduler {"userA" {"cpu" 2000.0 "memory" 400.0}})
+      (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                           RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
       (.schedule scheduler topologies cluster)
-      (is (= (.get (.getStatusMap cluster) "topology1") "Fully Scheduled"))
+      (is (= (.get (.getStatusMap cluster) "topology1") "Running - Fully Scheduled by DefaultResourceAwareStrategy"))
       (is (= (.getAssignedNumWorkers cluster topology1) 4)))
     (testing "test when no more workers are available due to topology worker max heap size limit but there is memory is still available")
     (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
@@ -662,18 +692,20 @@
                        TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                        TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                        TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
-                       TOPOLOGY-PRIORITY 0}
+                       TOPOLOGY-PRIORITY 0
+                       TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
                       storm-topology1
                       1
                       (mk-ed-map [["spout1" 0 5]]))
           topologies (Topologies. (to-top-map [topology1]))]
-      (.prepare scheduler {})
+      (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+                           RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
       (.schedule scheduler topologies cluster)
       ;;spout1 is going to contain 5 executors that needs scheduling. Each of those executors has a memory requirement of 128.0 MB
       ;;The cluster contains 4 free WorkerSlots. For this topolology each worker is limited to a max heap size of 128.0
       ;;Thus, one executor not going to be able to get scheduled thus failing the scheduling of this topology and no executors of this topology will be scheduleded
       (is (= (.size (.getUnassignedExecutors cluster topology1)) 5))
-      (is (= (.get (.getStatusMap cluster) "topology1")  "Not all executors successfully scheduled: [[1, 1]]")))
+      (is (= (.get (.getStatusMap cluster) "topology1")  "Not enough resources to schedule - 0/5 executors scheduled")))
 
     (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
                              {STORM-NETWORK-TOPOGRAPHY-PLUGIN
@@ -688,7 +720,8 @@
                  TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                  TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
                  TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
-                 TOPOLOGY-PRIORITY 0}
+                 TOPOLOGY-PRIORITY 0
+                 TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
           topology1 (TopologyDetails. "topology1"
                       conf
                       storm-topology1

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
deleted file mode 100644
index 3ff0af1..0000000
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.scheduler.resource;
-
-import backtype.storm.Config;
-import backtype.storm.scheduler.Cluster;
-import backtype.storm.scheduler.INimbus;
-import backtype.storm.scheduler.SchedulerAssignmentImpl;
-import backtype.storm.scheduler.SupervisorDetails;
-import backtype.storm.scheduler.Topologies;
-import backtype.storm.scheduler.TopologyDetails;
-import backtype.storm.utils.Time;
-import backtype.storm.utils.Utils;
-import org.junit.Test;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Created by jerrypeng on 11/11/15.
- */
-public class Experiment {
-
-    private static final Logger LOG = LoggerFactory.getLogger(Experiment.class);
-
-    /**
-     * Eviction order:
-     * topo-3: since user bobby don't have any resource guarantees and topo-3 is the lowest priority for user bobby
-     * topo-2: since user bobby don't have any resource guarantees and topo-2 is the next lowest priority for user bobby
-     * topo-5: since user derek has exceeded his resource guarantee while user jerry has not.  topo-5 and topo-4 has the same priority
-     * but topo-4 was submitted earlier thus we choose that one to evict
-     */
-    @Test
-    public void TestEvictMultipleTopologiesFromMultipleUsersInCorrectOrder() {
-        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
-        Map<String, Number> resourceMap = new HashMap<String, Number>();
-        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
-        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
-        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
-        Config config = new Config();
-        config.putAll(Utils.readDefaultConfig());
-        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
-        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
-        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
-        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
-        resourceUserPool.put("jerry", new HashMap<String, Number>());
-        resourceUserPool.get("jerry").put("cpu", 300.0);
-        resourceUserPool.get("jerry").put("memory", 3000.0);
-
-        resourceUserPool.put("derek", new HashMap<String, Number>());
-        resourceUserPool.get("derek").put("cpu", 100.0);
-        resourceUserPool.get("derek").put("memory", 1000.0);
-
-        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
-        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
-        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
-        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
-
-        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
-        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
-        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 15, 30);
-
-        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
-        topoMap.put(topo2.getId(), topo2);
-        topoMap.put(topo3.getId(), topo3);
-        topoMap.put(topo4.getId(), topo4);
-        topoMap.put(topo5.getId(), topo5);
-
-        Topologies topologies = new Topologies(topoMap);
-
-        ResourceAwareScheduler rs = new ResourceAwareScheduler();
-
-        rs.prepare(config);
-        rs.schedule(topologies, cluster);
-
-        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
-            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
-        }
-        Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
-        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
-        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
-        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
-
-        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
-            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
-        }
-        Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
-        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
-        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
-        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
-
-        //user jerry submits another topology
-        topoMap.put(topo1.getId(), topo1);
-        topologies = new Topologies(topoMap);
-        rs.schedule(topologies, cluster);
-
-        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
-            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
-        }
-        Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
-        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
-        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
-        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
-
-        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
-            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
-        }
-        Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
-        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
-        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
-        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
-
-        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
-            Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
-        }
-        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
-            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
-        }
-        Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size());
-        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
-        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
-        Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
-        Assert.assertEquals("correct topology to evict", rs.getUser("bobby").getTopologiesAttempted().iterator().next().getName(), "topo-3");
-
-        topoMap.put(topo6.getId(), topo6);
-        topologies = new Topologies(topoMap);
-        rs.schedule(topologies, cluster);
-
-        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
-            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
-        }
-        Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
-        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
-        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
-        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
-
-        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
-            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
-        }
-        Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
-        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
-        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
-        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
-
-        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
-            Assert.assertFalse(TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
-        }
-        Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
-        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
-        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
-        Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
-
-        Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
-        Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
-
-        topoMap.put(topo7.getId(), topo7);
-        topologies = new Topologies(topoMap);
-        rs.schedule(topologies, cluster);
-
-        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
-            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
-        }
-        Assert.assertEquals("# of running topologies", 3, rs.getUser("jerry").getTopologiesRunning().size());
-        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
-        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
-        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
-
-        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
-            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
-        }
-        for (TopologyDetails topo : rs.getUser("derek").getTopologiesAttempted()) {
-            Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
-        }
-        Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
-        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
-        Assert.assertEquals("# of attempted topologies", 1, rs.getUser("derek").getTopologiesAttempted().size());
-        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
-        Assert.assertEquals("correct topology to evict", rs.getUser("derek").getTopologiesAttempted().iterator().next().getName(), "topo-4");
-
-        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
-            Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
-        }
-        Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
-        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
-        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
-        Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
-
-        Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
-        Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
index b41b039..ffdf460 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -20,8 +20,10 @@ package backtype.storm.scheduler.resource;
 
 import backtype.storm.Config;
 import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
 import backtype.storm.scheduler.INimbus;
 import backtype.storm.scheduler.IScheduler;
+import backtype.storm.scheduler.SchedulerAssignment;
 import backtype.storm.scheduler.SchedulerAssignmentImpl;
 import backtype.storm.scheduler.SupervisorDetails;
 import backtype.storm.scheduler.Topologies;
@@ -66,6 +68,9 @@ public class TestResourceAwareScheduler {
         Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(20, 4, resourceMap);
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
 
         config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER);
 
@@ -175,6 +180,9 @@ public class TestResourceAwareScheduler {
         Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(20, 4, resourceMap);
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
         Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
         resourceUserPool.put("jerry", new HashMap<String, Number>());
         resourceUserPool.get("jerry").put("cpu", 1000);
@@ -240,7 +248,7 @@ public class TestResourceAwareScheduler {
         rs.schedule(topologies, cluster);
 
         for (TopologyDetails topo : topoMap.values()) {
-            Assert.assertEquals(cluster.getStatusMap().get(topo.getId()), "Fully Scheduled");
+            Assert.assertTrue(TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
 
         for (User user : rs.getUserMap().values()) {
@@ -258,6 +266,9 @@ public class TestResourceAwareScheduler {
         Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, resourceMap);
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
         Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
         resourceUserPool.put("jerry", new HashMap<String, Number>());
         resourceUserPool.get("jerry").put("cpu", 1000);
@@ -316,6 +327,9 @@ public class TestResourceAwareScheduler {
         Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
         config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
@@ -432,6 +446,9 @@ public class TestResourceAwareScheduler {
         Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
         config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
@@ -539,6 +556,9 @@ public class TestResourceAwareScheduler {
         Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
         Config config = new Config();
         config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
         config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
         config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
@@ -653,7 +673,7 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
 
         for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
-            Assert.assertFalse(TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+            Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
         }
         Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
         Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
@@ -698,4 +718,459 @@ public class TestResourceAwareScheduler {
         Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
         Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
     }
+
+    /**
+     * If topologies from other users cannot be evicted to make space
+     * check if there is a topology with lower priority that can be evicted from the current user
+     */
+    @Test
+    public void TestEvictTopologyFromItself() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+        resourceUserPool.put("jerry", new HashMap<String, Number>());
+        resourceUserPool.get("jerry").put("cpu", 200.0);
+        resourceUserPool.get("jerry").put("memory", 2000.0);
+
+        resourceUserPool.put("bobby", new HashMap<String, Number>());
+        resourceUserPool.get("bobby").put("cpu", 100.0);
+        resourceUserPool.get("bobby").put("memory", 1000.0);
+
+
+        resourceUserPool.put("derek", new HashMap<String, Number>());
+        resourceUserPool.get("derek").put("cpu", 100.0);
+        resourceUserPool.get("derek").put("memory", 1000.0);
+
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo2.getId(), topo2);
+        topoMap.put(topo5.getId(), topo5);
+        topoMap.put(topo6.getId(), topo6);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+
+        //user jerry submits another topology into a full cluster
+        // topo3 should not be able to scheduled
+        topoMap.put(topo3.getId(), topo3);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 1, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+        //make sure that topo-3 didn't get scheduled.
+        Assert.assertEquals("correct topology in attempted queue", rs.getUser("jerry").getTopologiesAttempted().iterator().next().getName(), "topo-3");
+
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+
+        //user jerry submits another topology but this one should be scheduled since it has higher priority than than the
+        //rest of jerry's running topologies
+        topoMap.put(topo4.getId(), topo4);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 2, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+        Assert.assertTrue("correct topology in attempted queue", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("jerry").getTopologiesAttempted()) != null);
+        //Either topo-1 or topo-2 should have gotten evicted
+        Assert.assertTrue("correct topology in attempted queue", ((TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-1", rs.getUser("jerry").getTopologiesAttempted())) != null)
+                || (TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("jerry").getTopologiesAttempted()) != null));
+        //assert that topo-4 got scheduled
+        Assert.assertTrue("correct topology in running queue", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-4", rs.getUser("jerry").getTopologiesRunning()) != null);
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+    }
+
+    /**
+     * If topologies from other users cannot be evicted to make space
+     * check if there is a topology with lower priority that can be evicted from the current user
+     */
+    @Test
+    public void TestOverGuaranteeEviction() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+        resourceUserPool.put("jerry", new HashMap<String, Number>());
+        resourceUserPool.get("jerry").put("cpu", 70.0);
+        resourceUserPool.get("jerry").put("memory", 700.0);
+
+        resourceUserPool.put("bobby", new HashMap<String, Number>());
+        resourceUserPool.get("bobby").put("cpu", 100.0);
+        resourceUserPool.get("bobby").put("memory", 1000.0);
+
+
+        resourceUserPool.put("derek", new HashMap<String, Number>());
+        resourceUserPool.get("derek").put("cpu", 25.0);
+        resourceUserPool.get("derek").put("memory", 250.0);
+
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo3.getId(), topo3);
+        topoMap.put(topo4.getId(), topo4);
+        topoMap.put(topo5.getId(), topo5);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+
+        //user derek submits another topology into a full cluster
+        // topo6 should not be able to scheduled
+        topoMap.put(topo6.getId(), topo6);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 1, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+        //topo5 will be evicted since topo6 has higher priority
+        Assert.assertEquals("correct topology in attempted queue", "topo-5", rs.getUser("derek").getTopologiesAttempted().iterator().next().getName());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+
+        //user jerry submits topo2
+        topoMap.put(topo2.getId(), topo2);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 0, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 2, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+        Assert.assertEquals("correct topology in attempted queue", "topo-6", rs.getUser("derek").getTopologiesAttempted().iterator().next().getName());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+    }
+
+    /**
+     * Test correct behaviour when a supervisor dies.  Check if the scheduler handles it correctly and evicts the correct
+     * topology when rescheduling the executors from the died supervisor
+     */
+    @Test
+    public void TestFaultTolerance() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(6, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+        resourceUserPool.put("jerry", new HashMap<String, Number>());
+        resourceUserPool.get("jerry").put("cpu", 50.0);
+        resourceUserPool.get("jerry").put("memory", 500.0);
+
+        resourceUserPool.put("bobby", new HashMap<String, Number>());
+        resourceUserPool.get("bobby").put("cpu", 200.0);
+        resourceUserPool.get("bobby").put("memory", 2000.0);
+
+
+        resourceUserPool.put("derek", new HashMap<String, Number>());
+        resourceUserPool.get("derek").put("cpu", 100.0);
+        resourceUserPool.get("derek").put("memory", 1000.0);
+
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo2.getId(), topo2);
+        topoMap.put(topo3.getId(), topo3);
+        topoMap.put(topo4.getId(), topo4);
+        topoMap.put(topo5.getId(), topo5);
+        topoMap.put(topo6.getId(), topo6);
+
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+
+        //fail supervisor
+        SupervisorDetails supFailed = cluster.getSupervisors().values().iterator().next();
+        LOG.info("/***** failing supervisor: {} ****/", supFailed.getHost());
+        supMap.remove(supFailed.getId());
+        Map<String, SchedulerAssignmentImpl> newAssignments = new HashMap<String, SchedulerAssignmentImpl>();
+        for (Map.Entry<String, SchedulerAssignment> topoToAssignment : cluster.getAssignments().entrySet()) {
+            String topoId = topoToAssignment.getKey();
+            SchedulerAssignment assignment = topoToAssignment.getValue();
+            Map<ExecutorDetails, WorkerSlot> executorToSlots = new HashMap<ExecutorDetails, WorkerSlot>();
+            for (Map.Entry<ExecutorDetails, WorkerSlot> execToWorker : assignment.getExecutorToSlot().entrySet()) {
+                ExecutorDetails exec = execToWorker.getKey();
+                WorkerSlot ws = execToWorker.getValue();
+                if (!ws.getNodeId().equals(supFailed.getId())) {
+                    executorToSlots.put(exec, ws);
+                }
+            }
+            newAssignments.put(topoId, new SchedulerAssignmentImpl(topoId, executorToSlots));
+        }
+        Map<String, String> statusMap = cluster.getStatusMap();
+        cluster = new Cluster(iNimbus, supMap, newAssignments, config);
+        cluster.setStatusMap(statusMap);
+
+        rs.schedule(topologies, cluster);
+
+        //Supervisor failed contains a executor from topo-6 of user derek.  Should evict a topology from user jerry since user will be above resource guarantee more so than user derek
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 1, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
index e9bf039..24ff980 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
@@ -104,8 +104,8 @@ public class TestUser {
         Assert.assertEquals("check cpu resource guarantee", cpuGuarantee, user1.getCPUResourceGuaranteed(), 0.001);
         Assert.assertEquals("check memory resource guarantee", memoryGuarantee, user1.getMemoryResourceGuaranteed(), 0.001);
 
-        Assert.assertEquals("check cpu resource pool utilization", ((100.0 * 3.0) / cpuGuarantee), user1.getCPUResourcePoolUtilization().doubleValue(), 0.001);
-        Assert.assertEquals("check memory resource pool utilization", ((200.0 + 200.0) * 3.0) / memoryGuarantee, user1.getMemoryResourcePoolUtilization().doubleValue(), 0.001);
+        Assert.assertEquals("check cpu resource pool utilization", ((100.0 * 3.0) / cpuGuarantee), user1.getCPUResourcePoolUtilization(), 0.001);
+        Assert.assertEquals("check memory resource pool utilization", ((200.0 + 200.0) * 3.0) / memoryGuarantee, user1.getMemoryResourcePoolUtilization(), 0.001);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 7721300..2098f0c 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -97,31 +97,31 @@ public class TestUtilsForResourceAwareScheduler {
 
     public static Map<String, SupervisorDetails> genSupervisors(int numSup, int numPorts, Map resourceMap) {
         Map<String, SupervisorDetails> retList = new HashMap<String, SupervisorDetails>();
-        for(int i=0; i<numSup; i++) {
+        for (int i = 0; i < numSup; i++) {
             List<Number> ports = new LinkedList<Number>();
-            for(int j = 0; j<numPorts; j++) {
+            for (int j = 0; j < numPorts; j++) {
                 ports.add(j);
             }
-            SupervisorDetails sup = new SupervisorDetails("sup-"+i, "host-"+i, null, ports, resourceMap);
+            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, resourceMap);
             retList.put(sup.getId(), sup);
         }
         return retList;
     }
 
     public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology, int spoutParallelism, int boltParallelism) {
-        Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String> ();
-        int startTask=0;
-        int endTask=1;
-        for(Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
-            for(int i=0; i<spoutParallelism; i++) {
+        Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String>();
+        int startTask = 0;
+        int endTask = 1;
+        for (Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+            for (int i = 0; i < spoutParallelism; i++) {
                 retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
                 startTask++;
                 endTask++;
             }
         }
 
-        for(Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
-            for(int i=0; i<boltParallelism; i++) {
+        for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
+            for (int i = 0; i < boltParallelism; i++) {
                 retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
                 startTask++;
                 endTask++;
@@ -131,14 +131,14 @@ public class TestUtilsForResourceAwareScheduler {
     }
 
     public static TopologyDetails getTopology(String name, Map config, int numSpout, int numBolt,
-                                       int spoutParallelism, int boltParallelism, int launchTime, int priority) {
+                                              int spoutParallelism, int boltParallelism, int launchTime, int priority) {
 
         Config conf = new Config();
         conf.putAll(config);
         conf.put(Config.TOPOLOGY_PRIORITY, priority);
         conf.put(Config.TOPOLOGY_NAME, name);
         conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
-        StormTopology topology = buildTopology(numSpout,numBolt, spoutParallelism, boltParallelism);
+        StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
         TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
                 0,
                 genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime);
@@ -258,7 +258,7 @@ public class TestUtilsForResourceAwareScheduler {
 
         @Override
         public String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId) {
-            if(existingSupervisors.containsKey(nodeId)) {
+            if (existingSupervisors.containsKey(nodeId)) {
                 return existingSupervisors.get(nodeId).getHost();
             }
             return null;
@@ -270,10 +270,10 @@ public class TestUtilsForResourceAwareScheduler {
         }
     }
 
-    private static boolean isContain(String source, String subItem){
-        String pattern = "\\b"+subItem+"\\b";
-        Pattern p=Pattern.compile(pattern, Pattern.CASE_INSENSITIVE);
-        Matcher m=p.matcher(source);
+    private static boolean isContain(String source, String subItem) {
+        String pattern = "\\b" + subItem + "\\b";
+        Pattern p = Pattern.compile(pattern, Pattern.CASE_INSENSITIVE);
+        Matcher m = p.matcher(source);
         return m.find();
     }
 
@@ -283,8 +283,8 @@ public class TestUtilsForResourceAwareScheduler {
 
     public static TopologyDetails findTopologyInSetFromName(String topoName, Set<TopologyDetails> set) {
         TopologyDetails ret = null;
-        for(TopologyDetails entry : set) {
-            if(entry.getName().equals(topoName)) {
+        for (TopologyDetails entry : set) {
+            if (entry.getName().equals(topoName)) {
                 ret = entry;
             }
         }