You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:30 UTC

[02/23] storm git commit: adding unit tests for STORM-898

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
index aec297d..4fff405 100644
--- a/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
+++ b/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
@@ -24,7 +24,7 @@
            [backtype.storm.topology TopologyBuilder])
   (:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails
             SchedulerAssignmentImpl Topologies TopologyDetails])
-  (:import [backtype.storm.scheduler.resource RAS_Node ResourceAwareScheduler])
+  (:import [backtype.storm.scheduler.resource RAS_Node RAS_Nodes ResourceAwareScheduler])
   (:import [backtype.storm Config StormSubmitter])
   (:import [backtype.storm LocalDRPC LocalCluster])
   (:import [java.util HashMap]))
@@ -92,12 +92,12 @@
                     super (+ cpu (.get super->cpu-usage super))))))  ;; add all topo's cpu usage for each super
     super->cpu-usage))
 
-;; testing resource/Node class
+; testing resource/Node class
 (deftest test-node
   (let [supers (gen-supervisors 5 4)
         cluster (Cluster. (nimbus/standalone-nimbus) supers {} {})
         topologies (Topologies. (to-top-map []))
-        node-map (RAS_Node/getAllNodesFrom cluster topologies)
+        node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
         topology1 (TopologyDetails. "topology1" {} nil 0)
         topology2 (TopologyDetails. "topology2" {} nil 0)]
     (is (= 5 (.size node-map)))
@@ -109,31 +109,31 @@
       (is (= 4 (.totalSlotsFree node)))
       (is (= 0 (.totalSlotsUsed node)))
       (is (= 4 (.totalSlots node)))
-      (.assign node topology1 (list (ExecutorDetails. 1 1)) cluster)
+      (.assign node topology1 (list (ExecutorDetails. 1 1)))
       (is (= 1 (.size (.getRunningTopologies node))))
       (is (= false (.isTotallyFree node)))
       (is (= 3 (.totalSlotsFree node)))
       (is (= 1 (.totalSlotsUsed node)))
       (is (= 4 (.totalSlots node)))
-      (.assign node topology1 (list (ExecutorDetails. 2 2)) cluster)
+      (.assign node topology1 (list (ExecutorDetails. 2 2)))
       (is (= 1 (.size (.getRunningTopologies node))))
       (is (= false (.isTotallyFree node)))
       (is (= 2 (.totalSlotsFree node)))
       (is (= 2 (.totalSlotsUsed node)))
       (is (= 4 (.totalSlots node)))
-      (.assign node topology2 (list (ExecutorDetails. 1 1)) cluster)
+      (.assign node topology2 (list (ExecutorDetails. 1 1)))
       (is (= 2 (.size (.getRunningTopologies node))))
       (is (= false (.isTotallyFree node)))
       (is (= 1 (.totalSlotsFree node)))
       (is (= 3 (.totalSlotsUsed node)))
       (is (= 4 (.totalSlots node)))
-      (.assign node topology2 (list (ExecutorDetails. 2 2)) cluster)
+      (.assign node topology2 (list (ExecutorDetails. 2 2)))
       (is (= 2 (.size (.getRunningTopologies node))))
       (is (= false (.isTotallyFree node)))
       (is (= 0 (.totalSlotsFree node)))
       (is (= 4 (.totalSlotsUsed node)))
       (is (= 4 (.totalSlots node)))
-      (.freeAllSlots node cluster)
+      (.freeAllSlots node)
       (is (= 0 (.size (.getRunningTopologies node))))
       (is (= true (.isTotallyFree node)))
       (is (= 4 (.totalSlotsFree node)))
@@ -153,7 +153,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+                     TOPOLOGY-PRIORITY 0}
                     storm-topology
                     1
                     (mk-ed-map [["wordSpout" 0 1]
@@ -162,8 +163,9 @@
                   {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                    "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
         topologies (Topologies. (to-top-map [topology1]))
-        node-map (RAS_Node/getAllNodesFrom cluster topologies)
+        node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
         scheduler (ResourceAwareScheduler.)]
+    (.prepare scheduler {})
     (.schedule scheduler topologies cluster)
     (let [assignment (.getAssignmentById cluster "topology1")
           assigned-slots (.getSlots assignment)
@@ -192,7 +194,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+                     TOPOLOGY-PRIORITY 0}
                     storm-topology1
                     1
                     (mk-ed-map [["wordSpout1" 0 1]
@@ -212,7 +215,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+                     TOPOLOGY-PRIORITY 0}
                     storm-topology2
                     1
                     (mk-ed-map [["wordSpoutX" 0 1]
@@ -223,6 +227,7 @@
                    "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
         topologies (Topologies. (to-top-map [topology1 topology2]))
         scheduler (ResourceAwareScheduler.)]
+    (.prepare scheduler {})
     (.schedule scheduler topologies cluster)
     (let [assignment (.getAssignmentById cluster "topology1")
           assigned-slots (.getSlots assignment)
@@ -255,7 +260,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+                     TOPOLOGY-PRIORITY 0}
                     storm-topology
                     2
                     (mk-ed-map [["wordSpout" 0 1]
@@ -265,6 +271,7 @@
                    "backtype.storm.testing.AlternateRackDNSToSwitchMapping"})
         topologies (Topologies. (to-top-map [topology2]))
         scheduler (ResourceAwareScheduler.)]
+    (.prepare scheduler {})
     (.schedule scheduler topologies cluster)
     (let [assignment (.getAssignmentById cluster "topology2")
           assigned-slots (.getSlots assignment)
@@ -292,7 +299,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+                     TOPOLOGY-PRIORITY 0}
                     storm-topology
                     2 ;; need two workers, each on one node
                     (mk-ed-map [["wordSpout" 0 2]
@@ -302,6 +310,7 @@
                    "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
         topologies (Topologies. (to-top-map [topology1]))
         scheduler (ResourceAwareScheduler.)]
+    (.prepare scheduler {})
     (.schedule scheduler topologies cluster)
     (let [assignment (.getAssignmentById cluster "topology1")
           assigned-slots (.getSlots assignment)
@@ -331,7 +340,7 @@
     (is (< (Math/abs (- 100.0 (first assigned-ed-cpu))) epsilon))
     (is (< (Math/abs (- 250.0 (second assigned-ed-cpu))) epsilon))
     (is (< (Math/abs (- 250.0 (last assigned-ed-cpu))) epsilon))
-    (doseq [[avail used] mem-avail->used] ;; for each node, assigned mem smaller than total 
+    (doseq [[avail used] mem-avail->used] ;; for each node, assigned mem smaller than total
       (is (>= avail used)))
     (doseq [[avail used] cpu-avail->used] ;; for each node, assigned cpu smaller than total
       (is (>= avail used))))
@@ -348,7 +357,8 @@
                       TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                       TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                       TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+                      TOPOLOGY-PRIORITY 0}
                      storm-topology1
                      3 ;; three workers to hold three executors
                      (mk-ed-map [["spout1" 0 3]]))
@@ -361,7 +371,8 @@
                       TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 1280.0 ;; large enough thus two eds can not be fully assigned to one node
                       TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                       TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+                      TOPOLOGY-PRIORITY 0}
                      storm-topology2
                      2  ;; two workers, each holds one executor and resides on one node
                      (mk-ed-map [["spout2" 0 2]]))
@@ -372,6 +383,7 @@
                               {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                                "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
             topologies (Topologies. (to-top-map [topology2]))
+            _ (.prepare scheduler {})
             _ (.schedule scheduler topologies cluster)
             assignment (.getAssignmentById cluster "topology2")
             failed-worker (first (vec (.getSlots assignment)))  ;; choose a worker to mock as failed
@@ -380,6 +392,7 @@
             _ (doseq [ed failed-eds] (.remove ed->slot ed))  ;; remove executor details assigned to the worker
             copy-old-mapping (HashMap. ed->slot)
             healthy-eds (.keySet copy-old-mapping)
+            _ (.prepare scheduler {})
             _ (.schedule scheduler topologies cluster)
             new-assignment (.getAssignmentById cluster "topology2")
             new-ed->slot (.getExecutorToSlot new-assignment)]
@@ -387,7 +400,7 @@
         (doseq [ed healthy-eds]
           (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
         (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))))
-    
+
     (testing "When a supervisor fails, RAS does not alter existing assignments"
       (let [existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1"
                                                                         {(ExecutorDetails. 0 0) (WorkerSlot. "id0" 0)    ;; worker 0 on the failed super
@@ -401,11 +414,12 @@
             ed->slot (.getExecutorToSlot assignment)
             copy-old-mapping (HashMap. ed->slot)
             existing-eds (.keySet copy-old-mapping)  ;; all the three eds on three workers
-            new-cluster (Cluster. (nimbus/standalone-nimbus) 
+            new-cluster (Cluster. (nimbus/standalone-nimbus)
                                   (dissoc supers "id0")        ;; mock the super0 as a failed supervisor
                                   (.getAssignments cluster)
                                   {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                                    "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
+            _ (.prepare scheduler {})
             _ (.schedule scheduler topologies new-cluster) ;; the actual schedule for this topo will not run since it is fully assigned
             new-assignment (.getAssignmentById new-cluster "topology1")
             new-ed->slot (.getExecutorToSlot new-assignment)]
@@ -427,11 +441,12 @@
             _ (.remove ed->slot (ExecutorDetails. 1 1))  ;; delete one worker of super0 (failed) from topo1 assignment to enable actual schedule for testing
             copy-old-mapping (HashMap. ed->slot)
             existing-eds (.keySet copy-old-mapping)  ;; namely the two eds on the orphaned worker and the healthy worker
-            new-cluster (Cluster. (nimbus/standalone-nimbus) 
+            new-cluster (Cluster. (nimbus/standalone-nimbus)
                                   (dissoc supers "id0")        ;; mock the super0 as a failed supervisor
                                   (.getAssignments cluster)
                                   {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                                    "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
+            _ (.prepare scheduler {})
             _ (.schedule scheduler topologies new-cluster)
             new-assignment (.getAssignmentById new-cluster "topology1")
             new-ed->slot (.getExecutorToSlot new-assignment)]
@@ -444,11 +459,13 @@
                               {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                                "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
             topologies (Topologies. (to-top-map [topology1]))
+            _ (.prepare scheduler {})
             _ (.schedule scheduler topologies cluster)
             assignment (.getAssignmentById cluster "topology1")
             ed->slot (.getExecutorToSlot assignment)
             copy-old-mapping (HashMap. ed->slot)
             new-topologies (Topologies. (to-top-map [topology1 topology2]))  ;; a second topology joins
+            _ (.prepare scheduler {})
             _ (.schedule scheduler new-topologies cluster)
             new-assignment (.getAssignmentById cluster "topology1")
             new-ed->slot (.getExecutorToSlot new-assignment)]
@@ -459,17 +476,17 @@
 
 ;; Automated tests for heterogeneous cluster
 (deftest test-heterogeneous-cluster
-  (let [supers (into {} (for [super [(SupervisorDetails. (str "id" 0) (str "host" 0) (list ) 
+  (let [supers (into {} (for [super [(SupervisorDetails. (str "id" 0) (str "host" 0) (list )
                                                          (map int (list 1 2 3 4))
                                                          {Config/SUPERVISOR_MEMORY_CAPACITY_MB 4096.0
                                                           Config/SUPERVISOR_CPU_CAPACITY 800.0})
-                                     (SupervisorDetails. (str "id" 1) (str "host" 1) (list ) 
+                                     (SupervisorDetails. (str "id" 1) (str "host" 1) (list )
                                                          (map int (list 1 2 3 4))
                                                          {Config/SUPERVISOR_MEMORY_CAPACITY_MB 1024.0
                                                           Config/SUPERVISOR_CPU_CAPACITY 200.0})]]
                           {(.getId super) super}))
         builder1 (TopologyBuilder.)  ;; topo1 has one single huge task that can not be handled by the small-super
-        _ (doto (.setSpout builder1 "spout1" (TestWordSpout.) 1) 
+        _ (doto (.setSpout builder1 "spout1" (TestWordSpout.) 1)
             (.setMemoryLoad 2000.0 48.0)
             (.setCPULoad 300.0))
         storm-topology1 (.createTopology builder1)
@@ -479,7 +496,8 @@
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+                     TOPOLOGY-PRIORITY 0}
                     storm-topology1
                     1
                     (mk-ed-map [["spout1" 0 1]]))
@@ -494,9 +512,10 @@
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+                     TOPOLOGY-PRIORITY 0}
                     storm-topology2
-                    2 
+                    2
                     (mk-ed-map [["spout2" 0 4]]))
         builder3 (TopologyBuilder.) ;; topo3 has 4 medium tasks, launching topo 1-3 together requires the same mem as the cluster's mem capacity (5G)
         _ (doto (.setSpout builder3 "spout3" (TestWordSpout.) 4)
@@ -509,9 +528,10 @@
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+                     TOPOLOGY-PRIORITY 0}
                     storm-topology3
-                    2 
+                    2
                     (mk-ed-map [["spout3" 0 4]]))
         builder4 (TopologyBuilder.) ;; topo4 has 12 small tasks, each's mem req does not exactly divide a node's mem capacity
         _ (doto (.setSpout builder4 "spout4" (TestWordSpout.) 2)
@@ -524,9 +544,10 @@
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+                     TOPOLOGY-PRIORITY 0}
                     storm-topology4
-                    2 
+                    2
                     (mk-ed-map [["spout4" 0 12]]))
         builder5 (TopologyBuilder.) ;; topo5 has 40 small tasks, it should be able to exactly use up both the cpu and mem in teh cluster
         _ (doto (.setSpout builder5 "spout5" (TestWordSpout.) 40)
@@ -539,26 +560,28 @@
                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+                     TOPOLOGY-PRIORITY 0}
                     storm-topology5
-                    2 
+                    2
                     (mk-ed-map [["spout5" 0 40]]))
         epsilon 0.000001
-        topologies (Topologies. (to-top-map [topology1 topology2]))
-        scheduler (ResourceAwareScheduler.)]
+        topologies (Topologies. (to-top-map [topology1 topology2]))]
 
     (testing "Launch topo 1-3 together, it should be able to use up either mem or cpu resource due to exact division"
       (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
                               {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                                "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
             topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
+            scheduler (ResourceAwareScheduler.)
+            _ (.prepare scheduler {})
             _ (.schedule scheduler topologies cluster)
             super->mem-usage (get-super->mem-usage cluster topologies)
             super->cpu-usage (get-super->cpu-usage cluster topologies)]
         (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
         (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))
         (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology3")))
-        (doseq [super (.values supers)] 
+        (doseq [super (.values supers)]
           (let [mem-avail (.getTotalMemory super)
                 mem-used (.get super->mem-usage super)
                 cpu-avail (.getTotalCPU super)
@@ -571,6 +594,8 @@
                               {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                                "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
             topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
+            scheduler (ResourceAwareScheduler.)
+            _ (.prepare scheduler {})
             _ (.schedule scheduler topologies cluster)
                 scheduled-topos (if (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")) 1 0)
                 scheduled-topos (+ scheduled-topos (if (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")) 1 0))
@@ -582,11 +607,13 @@
                               {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                                "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
             topologies (Topologies. (to-top-map [topology5]))
+            scheduler (ResourceAwareScheduler.)
+            _ (.prepare scheduler {})
             _ (.schedule scheduler topologies cluster)
             super->mem-usage (get-super->mem-usage cluster topologies)
             super->cpu-usage (get-super->cpu-usage cluster topologies)]
         (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology5")))
-        (doseq [super (.values supers)] 
+        (doseq [super (.values supers)]
           (let [mem-avail (.getTotalMemory super)
                 mem-used (.get super->mem-usage super)
                 cpu-avail (.getTotalCPU ^SupervisorDetails super)
@@ -606,15 +633,17 @@
           storm-topology1 (.createTopology builder1)
           topology1 (TopologyDetails. "topology1"
                       {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userC"
+                       TOPOLOGY-SUBMITTER-USER "userA"
                        TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                        TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                        TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                       TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0}
+                       TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
+                       TOPOLOGY-PRIORITY 0}
                       storm-topology1
                       1
                       (mk-ed-map [["spout1" 0 4]]))
           topologies (Topologies. (to-top-map [topology1]))]
+      (.prepare scheduler {"userA" {"cpu" 2000.0 "memory" 400.0}})
       (.schedule scheduler topologies cluster)
       (is (= (.get (.getStatusMap cluster) "topology1") "Fully Scheduled"))
       (is (= (.getAssignedNumWorkers cluster topology1) 4)))
@@ -632,18 +661,20 @@
                        TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
                        TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
                        TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                       TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0}
+                       TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
+                       TOPOLOGY-PRIORITY 0}
                       storm-topology1
                       1
                       (mk-ed-map [["spout1" 0 5]]))
           topologies (Topologies. (to-top-map [topology1]))]
+      (.prepare scheduler {})
       (.schedule scheduler topologies cluster)
       ;;spout1 is going to contain 5 executors that needs scheduling. Each of those executors has a memory requirement of 128.0 MB
       ;;The cluster contains 4 free WorkerSlots. For this topolology each worker is limited to a max heap size of 128.0
       ;;Thus, one executor not going to be able to get scheduled thus failing the scheduling of this topology and no executors of this topology will be scheduleded
       (is (= (.size (.getUnassignedExecutors cluster topology1)) 5))
-      (is (= (.get (.getStatusMap cluster) "topology1")  "Unsuccessful in scheduling")))
-    
+      (is (= (.get (.getStatusMap cluster) "topology1")  "Not all executors successfully scheduled: [[1, 1]]")))
+
     (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
                              {STORM-NETWORK-TOPOGRAPHY-PLUGIN
                               "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
@@ -652,11 +683,12 @@
           _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
           storm-topology1 (.createTopology builder1)
           conf  {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userC"
-                       TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 129.0
-                       TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                       TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                       TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0}
+                 TOPOLOGY-SUBMITTER-USER "userC"
+                 TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 129.0
+                 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
+                 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
+                 TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
+                 TOPOLOGY-PRIORITY 0}
           topology1 (TopologyDetails. "topology1"
                       conf
                       storm-topology1
@@ -665,5 +697,5 @@
           topologies (Topologies. (to-top-map [topology1]))]
       (is (thrown? IllegalArgumentException
             (StormSubmitter/submitTopologyWithProgressBar "test" conf storm-topology1)))
-       
+
   )))

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
index 7f193cc..a5e06c3 100644
--- a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
+++ b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
@@ -628,9 +628,64 @@ public class TestConfigValidate {
             } catch (IllegalArgumentException Ex) {
             }
         }
+    }
+
+    @Test
+    public void TestResourceAwareSchedulerUserPool() {
+        TestConfig config = new TestConfig();
+        Collection<Object> passCases = new LinkedList<Object>();
+        Collection<Object> failCases = new LinkedList<Object>();
+
+        Map<String, Map<String, Integer>> passCase1 = new HashMap<String, Map<String, Integer>>();
+        passCase1.put("jerry", new HashMap<String, Integer>());
+        passCase1.put("bobby", new HashMap<String, Integer>());
+        passCase1.put("derek", new HashMap<String, Integer>());
+
+        passCase1.get("jerry").put("cpu", 10000);
+        passCase1.get("jerry").put("memory", 20148);
+        passCase1.get("bobby").put("cpu", 20000);
+        passCase1.get("bobby").put("memory", 40148);
+        passCase1.get("derek").put("cpu", 30000);
+        passCase1.get("derek").put("memory", 60148);
+
+        passCases.add(passCase1);
+
+        for (Object value : passCases) {
+            config.put(TestConfig.TEST_MAP_CONFIG_7, value);
+            ConfigValidation.validateFields(config, TestConfig.class);
+        }
+
+        Map<String, Map<String, Integer>> failCase1 = new HashMap<String, Map<String, Integer>>();
+        failCase1.put("jerry", new HashMap<String, Integer>());
+        failCase1.put("bobby", new HashMap<String, Integer>());
+        failCase1.put("derek", new HashMap<String, Integer>());
 
+        failCase1.get("jerry").put("cpu", 10000);
+        failCase1.get("jerry").put("memory", 20148);
+        failCase1.get("bobby").put("cpu", 20000);
+        failCase1.get("bobby").put("memory", 40148);
+        failCase1.get("derek").put("cpu", 30000);
 
+        Map<String, Map<String, Integer>> failCase2 = new HashMap<String, Map<String, Integer>>();
+        failCase2.put("jerry", new HashMap<String, Integer>());
+        failCase2.put("bobby", new HashMap<String, Integer>());
+        failCase2.put("derek", new HashMap<String, Integer>());
+        failCase2.get("bobby").put("cpu", 20000);
+        failCase2.get("bobby").put("memory", 40148);
+        failCase2.get("derek").put("cpu", 30000);
+        failCase2.get("derek").put("memory", 60148);
 
+        failCases.add(failCase1);
+        failCases.add(failCase2);
+
+        for (Object value : failCases) {
+            try {
+                config.put(TestConfig.TEST_MAP_CONFIG_7, value);
+                ConfigValidation.validateFields(config, TestConfig.class);
+                Assert.fail("Expected Exception not Thrown for value: " + value);
+            } catch (IllegalArgumentException Ex) {
+            }
+        }
     }
 
     public class TestConfig extends HashMap<String, Object> {
@@ -656,5 +711,8 @@ public class TestConfigValidate {
 
         @isMapEntryCustom(keyValidatorClasses = {StringValidator.class}, valueValidatorClasses = {ImpersonationAclUserEntryValidator.class})
         public static final String TEST_MAP_CONFIG_6 = "test.map.config.6";
+
+        @isMapEntryCustom(keyValidatorClasses = {StringValidator.class}, valueValidatorClasses = {UserResourcePoolEntryValidator.class})
+        public static final String TEST_MAP_CONFIG_7 = "test.map.config.7";
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
new file mode 100644
index 0000000..bea41ff
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import backtype.storm.Config;
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.INimbus;
+import backtype.storm.scheduler.SchedulerAssignmentImpl;
+import backtype.storm.scheduler.SupervisorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.utils.Time;
+import backtype.storm.utils.Utils;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by jerrypeng on 11/11/15.
+ */
+public class Experiment {
+    @Test
+    public void TestMultipleUsers() {
+//        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+//        Map<String, Number> resourceMap = new HashMap<String, Number>();
+//        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 1000.0);
+//        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
+//        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
+//        Config config = new Config();
+//        config.putAll(Utils.readDefaultConfig());
+//        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+//        resourceUserPool.put("jerry", new HashMap<String, Number>());
+//        resourceUserPool.get("jerry").put("cpu", 1000);
+//        resourceUserPool.get("jerry").put("memory", 8192.0);
+//
+//        resourceUserPool.put("bobby", new HashMap<String, Number>());
+//        resourceUserPool.get("bobby").put("cpu", 10000.0);
+//        resourceUserPool.get("bobby").put("memory", 32768);
+//
+//        resourceUserPool.put("derek", new HashMap<String, Number>());
+//        resourceUserPool.get("derek").put("cpu", 5000.0);
+//        resourceUserPool.get("derek").put("memory", 16384.0);
+//
+//        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+//        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+//
+//        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+//
+//        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+//        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+//        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+//        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+//        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+//
+//        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+//
+//        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+//        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+//        TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+//        TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+//        TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+//
+//        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+//
+//        TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+//        TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+//        TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+//        TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+//        TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+//
+//
+//        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+//        topoMap.put(topo1.getId(), topo1);
+//        topoMap.put(topo2.getId(), topo2);
+//        topoMap.put(topo3.getId(), topo3);
+//        topoMap.put(topo4.getId(), topo4);
+//        topoMap.put(topo5.getId(), topo5);
+//        topoMap.put(topo6.getId(), topo6);
+//        topoMap.put(topo7.getId(), topo7);
+//        topoMap.put(topo8.getId(), topo8);
+//        topoMap.put(topo9.getId(), topo9);
+//        topoMap.put(topo10.getId(), topo10);
+//        topoMap.put(topo11.getId(), topo11);
+//        topoMap.put(topo12.getId(), topo12);
+//        topoMap.put(topo13.getId(), topo13);
+//        topoMap.put(topo14.getId(), topo14);
+//        topoMap.put(topo15.getId(), topo15);
+//
+//        Topologies topologies = new Topologies(topoMap);
+//
+//        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+//
+//        rs.prepare(config);
+//        rs.schedule(topologies, cluster);
+//
+//        for(TopologyDetails topo : topoMap.values()) {
+//            Assert.assertEquals(cluster.getStatusMap().get(topo.getId()), "Fully Scheduled");
+//        }
+//
+//        for(User user : rs.getUserMap().values()) {
+//            Assert.assertEquals(user.getTopologiesPending().size(), 0);
+//            Assert.assertEquals(user.getTopologiesRunning().size(), 5);
+//        }
+
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+        resourceUserPool.put("jerry", new HashMap<String, Number>());
+        resourceUserPool.get("jerry").put("cpu", 1000);
+        resourceUserPool.get("jerry").put("memory", 8192.0);
+
+        resourceUserPool.put("bobby", new HashMap<String, Number>());
+        resourceUserPool.get("bobby").put("cpu", 10000.0);
+        resourceUserPool.get("bobby").put("memory", 32768);
+
+        resourceUserPool.put("derek", new HashMap<String, Number>());
+        resourceUserPool.get("derek").put("cpu", 5000.0);
+        resourceUserPool.get("derek").put("memory", 16384.0);
+
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo2.getId(), topo2);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        int fullyScheduled = 0;
+        for (TopologyDetails topo : topoMap.values()) {
+            if(cluster.getStatusMap().get(topo.getId()).equals("Fully Scheduled")) {
+                fullyScheduled++;
+            }
+        }
+        Assert.assertEquals("# of Fully scheduled", 1, fullyScheduled);
+        Assert.assertEquals("# of topologies schedule attempted", 1, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of topologies running", 1, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of topologies schedule pending", 0, rs.getUser("jerry").getTopologiesPending().size());
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
new file mode 100644
index 0000000..73a8c73
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import backtype.storm.Config;
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.INimbus;
+import backtype.storm.scheduler.IScheduler;
+import backtype.storm.scheduler.SchedulerAssignmentImpl;
+import backtype.storm.scheduler.SupervisorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.utils.Time;
+import backtype.storm.utils.Utils;
+import backtype.storm.validation.ConfigValidation;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+public class TestResourceAwareScheduler {
+
+    private static final int NUM_SUPS = 20;
+    private static final int NUM_WORKERS_PER_SUP = 4;
+    private final String TOPOLOGY_SUBMITTER = "jerry";
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestResourceAwareScheduler.class);
+
+    @Test
+    public void TestReadInResourceAwareSchedulerUserPools() {
+
+        Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
+        LOG.info("fromFile: {}", fromFile);
+        ConfigValidation.validateFields(fromFile);
+    }
+
+    @Test
+    public void TestTopologySortedInCorrectOrder() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(20, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER);
+
+        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+        resourceUserPool.put("jerry", new HashMap<String, Number>());
+        resourceUserPool.get("jerry").put("cpu", 1000);
+        resourceUserPool.get("jerry").put("memory", 8192.0);
+
+        resourceUserPool.put("bobby", new HashMap<String, Number>());
+        resourceUserPool.get("bobby").put("cpu", 10000.0);
+        resourceUserPool.get("bobby").put("memory", 32768);
+
+        resourceUserPool.put("derek", new HashMap<String, Number>());
+        resourceUserPool.get("derek").put("cpu", 5000.0);
+        resourceUserPool.get("derek").put("memory", 16384.0);
+
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo2.getId(), topo2);
+        topoMap.put(topo3.getId(), topo3);
+        topoMap.put(topo4.getId(), topo4);
+        topoMap.put(topo5.getId(), topo5);
+
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        Set<TopologyDetails> queue = rs.getUser("jerry").getTopologiesPending();
+        Assert.assertEquals("check size", queue.size(), 0);
+
+        queue = rs.getUser("jerry").getTopologiesRunning();
+
+        Iterator<TopologyDetails> itr = queue.iterator();
+
+        TopologyDetails topo = itr.next();
+        LOG.info("{} - {}", topo.getName(), queue);
+        Assert.assertEquals("check order", topo.getName(), "topo-4");
+
+        topo = itr.next();
+        LOG.info("{} - {}", topo.getName(), queue);
+        Assert.assertEquals("check order", topo.getName(), "topo-1");
+
+        topo = itr.next();
+        LOG.info("{} - {}", topo.getName(), queue);
+        Assert.assertEquals("check order", topo.getName(), "topo-5");
+
+        topo = itr.next();
+        LOG.info("{} - {}", topo.getName(), queue);
+        Assert.assertEquals("check order", topo.getName(), "topo-3");
+
+        topo = itr.next();
+        LOG.info("{} - {}", topo.getName(), queue);
+        Assert.assertEquals("check order", topo.getName(), "topo-2");
+
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 30, 10);
+        topoMap.put(topo6.getId(), topo6);
+
+        topologies = new Topologies(topoMap);
+        rs = new ResourceAwareScheduler();
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        queue = rs.getUser("jerry").getTopologiesRunning();
+        itr = queue.iterator();
+
+        topo = itr.next();
+        Assert.assertEquals("check order", topo.getName(), "topo-6");
+
+        topo = itr.next();
+        Assert.assertEquals("check order", topo.getName(), "topo-4");
+
+        topo = itr.next();
+        Assert.assertEquals("check order", topo.getName(), "topo-1");
+
+        topo = itr.next();
+        Assert.assertEquals("check order", topo.getName(), "topo-5");
+
+        topo = itr.next();
+        Assert.assertEquals("check order", topo.getName(), "topo-3");
+
+        topo = itr.next();
+        Assert.assertEquals("check order", topo.getName(), "topo-2");
+
+        queue = rs.getUser("jerry").getTopologiesPending();
+        Assert.assertEquals("check size", queue.size(), 0);
+    }
+
+    @Test
+    public void TestMultipleUsers() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 1000.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(20, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+        resourceUserPool.put("jerry", new HashMap<String, Number>());
+        resourceUserPool.get("jerry").put("cpu", 1000);
+        resourceUserPool.get("jerry").put("memory", 8192.0);
+
+        resourceUserPool.put("bobby", new HashMap<String, Number>());
+        resourceUserPool.get("bobby").put("cpu", 10000.0);
+        resourceUserPool.get("bobby").put("memory", 32768);
+
+        resourceUserPool.put("derek", new HashMap<String, Number>());
+        resourceUserPool.get("derek").put("cpu", 5000.0);
+        resourceUserPool.get("derek").put("memory", 16384.0);
+
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+        TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+        TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+        TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+        TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+        TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+        TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+        TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo2.getId(), topo2);
+        topoMap.put(topo3.getId(), topo3);
+        topoMap.put(topo4.getId(), topo4);
+        topoMap.put(topo5.getId(), topo5);
+        topoMap.put(topo6.getId(), topo6);
+        topoMap.put(topo7.getId(), topo7);
+        topoMap.put(topo8.getId(), topo8);
+        topoMap.put(topo9.getId(), topo9);
+        topoMap.put(topo10.getId(), topo10);
+        topoMap.put(topo11.getId(), topo11);
+        topoMap.put(topo12.getId(), topo12);
+        topoMap.put(topo13.getId(), topo13);
+        topoMap.put(topo14.getId(), topo14);
+        topoMap.put(topo15.getId(), topo15);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : topoMap.values()) {
+            Assert.assertEquals(cluster.getStatusMap().get(topo.getId()), "Fully Scheduled");
+        }
+
+        for (User user : rs.getUserMap().values()) {
+            Assert.assertEquals(user.getTopologiesPending().size(), 0);
+            Assert.assertEquals(user.getTopologiesRunning().size(), 5);
+        }
+    }
+
+    @Test
+    public void testHandlingClusterSubscription() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+        resourceUserPool.put("jerry", new HashMap<String, Number>());
+        resourceUserPool.get("jerry").put("cpu", 1000);
+        resourceUserPool.get("jerry").put("memory", 8192.0);
+
+        resourceUserPool.put("bobby", new HashMap<String, Number>());
+        resourceUserPool.get("bobby").put("cpu", 10000.0);
+        resourceUserPool.get("bobby").put("memory", 32768);
+
+        resourceUserPool.put("derek", new HashMap<String, Number>());
+        resourceUserPool.get("derek").put("cpu", 5000.0);
+        resourceUserPool.get("derek").put("memory", 16384.0);
+
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo2.getId(), topo2);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        int fullyScheduled = 0;
+        for (TopologyDetails topo : topoMap.values()) {
+            if(cluster.getStatusMap().get(topo.getId()).equals("Fully Scheduled")) {
+                fullyScheduled++;
+            }
+        }
+        Assert.assertEquals("# of Fully scheduled", 1, fullyScheduled);
+        Assert.assertEquals("# of topologies schedule attempted", 1, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of topologies running", 1, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of topologies schedule pending", 0, rs.getUser("jerry").getTopologiesPending().size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
new file mode 100644
index 0000000..e9bf039
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import backtype.storm.Config;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.utils.Time;
+import backtype.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class TestUser {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestUser.class);
+
+    @Test
+    public void testAddTopologyToPendingQueue() {
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+
+        List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config);
+        User user1 = new User("user1");
+
+        for (TopologyDetails topo : topos) {
+            user1.addTopologyToPendingQueue(topo);
+        }
+
+        Assert.assertTrue(user1.getTopologiesPending().size() == topos.size());
+
+        List<String> correctOrder = TestUtilsForResourceAwareScheduler.getListOfTopologiesCorrectOrder();
+        Iterator<String> itr = correctOrder.iterator();
+        for (TopologyDetails topo : user1.getTopologiesPending()) {
+            Assert.assertEquals("check order", topo.getName(), itr.next());
+        }
+    }
+
+    @Test
+    public void testMoveTopoFromPendingToRunning() {
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+
+        List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config);
+        User user1 = new User("user1");
+
+        for (TopologyDetails topo : topos) {
+            user1.addTopologyToPendingQueue(topo);
+        }
+
+        int counter = 1;
+        for (TopologyDetails topo : topos) {
+            user1.moveTopoFromPendingToRunning(topo);
+            Assert.assertEquals("check correct size", (topos.size() - counter), user1.getTopologiesPending().size());
+            Assert.assertEquals("check correct size", counter, user1.getTopologiesRunning().size());
+            counter++;
+        }
+    }
+
+    @Test
+    public void testResourcePoolUtilization() {
+
+        Double cpuGuarantee = 400.0;
+        Double memoryGuarantee = 1000.0;
+        Map<String, Double> resourceGuaranteeMap = new HashMap<String, Double>();
+        resourceGuaranteeMap.put("cpu", cpuGuarantee);
+        resourceGuaranteeMap.put("memory", memoryGuarantee);
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 200);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 200);
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1, Time.currentTimeSecs() - 24, 9);
+
+        User user1 = new User("user1", resourceGuaranteeMap);
+
+        user1.addTopologyToRunningQueue(topo1);
+
+        Assert.assertEquals("check cpu resource guarantee", cpuGuarantee, user1.getCPUResourceGuaranteed(), 0.001);
+        Assert.assertEquals("check memory resource guarantee", memoryGuarantee, user1.getMemoryResourceGuaranteed(), 0.001);
+
+        Assert.assertEquals("check cpu resource pool utilization", ((100.0 * 3.0) / cpuGuarantee), user1.getCPUResourcePoolUtilization().doubleValue(), 0.001);
+        Assert.assertEquals("check memory resource pool utilization", ((200.0 + 200.0) * 3.0) / memoryGuarantee, user1.getMemoryResourcePoolUtilization().doubleValue(), 0.001);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
new file mode 100644
index 0000000..d4177c3
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import backtype.storm.Config;
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.INimbus;
+import backtype.storm.scheduler.IScheduler;
+import backtype.storm.scheduler.SupervisorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Time;
+import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+public class TestUtilsForResourceAwareScheduler {
+    private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
+
+    public static List<TopologyDetails> getListOfTopologies(Config config) {
+
+        List<TopologyDetails> topos = new LinkedList<TopologyDetails>();
+
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 0));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 0));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 15));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 8));
+        topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 9));
+        return topos;
+    }
+
+    public static List<String> getListOfTopologiesCorrectOrder() {
+        List<String> topos = new LinkedList<String>();
+        topos.add("topo-7");
+        topos.add("topo-6");
+        topos.add("topo-9");
+        topos.add("topo-10");
+        topos.add("topo-8");
+        topos.add("topo-4");
+        topos.add("topo-1");
+        topos.add("topo-5");
+        topos.add("topo-3");
+        topos.add("topo-2");
+        return topos;
+    }
+
+    public static Map<String, SupervisorDetails> genSupervisors(int numSup, int numPorts, Map resourceMap) {
+        Map<String, SupervisorDetails> retList = new HashMap<String, SupervisorDetails>();
+        for(int i=0; i<numSup; i++) {
+            List<Number> ports = new LinkedList<Number>();
+            for(int j = 0; j<numPorts; j++) {
+                ports.add(j);
+            }
+            SupervisorDetails sup = new SupervisorDetails("sup-"+i, "host-"+i, null, ports, resourceMap);
+            retList.put(sup.getId(), sup);
+        }
+        return retList;
+    }
+
+    public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology, int spoutParallelism, int boltParallelism) {
+        Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String> ();
+        int startTask=0;
+        int endTask=1;
+        for(Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+            for(int i=0; i<spoutParallelism; i++) {
+                retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
+                startTask++;
+                endTask++;
+            }
+        }
+
+        for(Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
+            for(int i=0; i<boltParallelism; i++) {
+                retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
+                startTask++;
+                endTask++;
+            }
+        }
+        return retMap;
+    }
+
+    public static TopologyDetails getTopology(String name, Map config, int numSpout, int numBolt,
+                                       int spoutParallelism, int boltParallelism, int launchTime, int priority) {
+
+        Config conf = new Config();
+        conf.putAll(config);
+        conf.put(Config.TOPOLOGY_PRIORITY, priority);
+        conf.put(Config.TOPOLOGY_NAME, name);
+        StormTopology topology = buildTopology(numSpout,numBolt, spoutParallelism, boltParallelism);
+        TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
+                0,
+                genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime);
+        return topo;
+    }
+
+    public static StormTopology buildTopology(int numSpout, int numBolt,
+                                              int spoutParallelism, int boltParallelism) {
+        LOG.debug("buildTopology with -> numSpout: " + numSpout + " spoutParallelism: "
+                + spoutParallelism + " numBolt: "
+                + numBolt + " boltParallelism: " + boltParallelism);
+        TopologyBuilder builder = new TopologyBuilder();
+
+        for (int i = 0; i < numSpout; i++) {
+            SpoutDeclarer s1 = builder.setSpout("spout-" + i, new TestSpout(),
+                    spoutParallelism);
+        }
+        int j = 0;
+        for (int i = 0; i < numBolt; i++) {
+            if (j >= numSpout) {
+                j = 0;
+            }
+            BoltDeclarer b1 = builder.setBolt("bolt-" + i, new TestBolt(),
+                    boltParallelism).shuffleGrouping("spout-" + j);
+        }
+
+        return builder.createTopology();
+    }
+
+    public static class TestSpout extends BaseRichSpout {
+        boolean _isDistributed;
+        SpoutOutputCollector _collector;
+
+        public TestSpout() {
+            this(true);
+        }
+
+        public TestSpout(boolean isDistributed) {
+            _isDistributed = isDistributed;
+        }
+
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            _collector = collector;
+        }
+
+        public void close() {
+
+        }
+
+        public void nextTuple() {
+            Utils.sleep(100);
+            final String[] words = new String[]{"nathan", "mike", "jackson", "golda", "bertels"};
+            final Random rand = new Random();
+            final String word = words[rand.nextInt(words.length)];
+            _collector.emit(new Values(word));
+        }
+
+        public void ack(Object msgId) {
+
+        }
+
+        public void fail(Object msgId) {
+
+        }
+
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+
+        @Override
+        public Map<String, Object> getComponentConfiguration() {
+            if (!_isDistributed) {
+                Map<String, Object> ret = new HashMap<String, Object>();
+                ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
+                return ret;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    public static class TestBolt extends BaseRichBolt {
+        OutputCollector _collector;
+
+        @Override
+        public void prepare(Map conf, TopologyContext context,
+                            OutputCollector collector) {
+            _collector = collector;
+        }
+
+        @Override
+        public void execute(Tuple tuple) {
+            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+    }
+
+    public static class INimbusTest implements INimbus {
+        @Override
+        public void prepare(Map stormConf, String schedulerLocalDir) {
+
+        }
+
+        @Override
+        public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, Topologies topologies, Set<String> topologiesMissingAssignments) {
+            return null;
+        }
+
+        @Override
+        public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
+
+        }
+
+        @Override
+        public String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId) {
+            if(existingSupervisors.containsKey(nodeId)) {
+                return existingSupervisors.get(nodeId).getHost();
+            }
+            return null;
+        }
+
+        @Override
+        public IScheduler getForcedScheduler() {
+            return null;
+        }
+    }
+}