You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:30 UTC
[02/23] storm git commit: adding unit tests for STORM-898
http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
index aec297d..4fff405 100644
--- a/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
+++ b/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
@@ -24,7 +24,7 @@
[backtype.storm.topology TopologyBuilder])
(:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails
SchedulerAssignmentImpl Topologies TopologyDetails])
- (:import [backtype.storm.scheduler.resource RAS_Node ResourceAwareScheduler])
+ (:import [backtype.storm.scheduler.resource RAS_Node RAS_Nodes ResourceAwareScheduler])
(:import [backtype.storm Config StormSubmitter])
(:import [backtype.storm LocalDRPC LocalCluster])
(:import [java.util HashMap]))
@@ -92,12 +92,12 @@
super (+ cpu (.get super->cpu-usage super)))))) ;; add all topo's cpu usage for each super
super->cpu-usage))
-;; testing resource/Node class
+; testing resource/Node class
(deftest test-node
(let [supers (gen-supervisors 5 4)
cluster (Cluster. (nimbus/standalone-nimbus) supers {} {})
topologies (Topologies. (to-top-map []))
- node-map (RAS_Node/getAllNodesFrom cluster topologies)
+ node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
topology1 (TopologyDetails. "topology1" {} nil 0)
topology2 (TopologyDetails. "topology2" {} nil 0)]
(is (= 5 (.size node-map)))
@@ -109,31 +109,31 @@
(is (= 4 (.totalSlotsFree node)))
(is (= 0 (.totalSlotsUsed node)))
(is (= 4 (.totalSlots node)))
- (.assign node topology1 (list (ExecutorDetails. 1 1)) cluster)
+ (.assign node topology1 (list (ExecutorDetails. 1 1)))
(is (= 1 (.size (.getRunningTopologies node))))
(is (= false (.isTotallyFree node)))
(is (= 3 (.totalSlotsFree node)))
(is (= 1 (.totalSlotsUsed node)))
(is (= 4 (.totalSlots node)))
- (.assign node topology1 (list (ExecutorDetails. 2 2)) cluster)
+ (.assign node topology1 (list (ExecutorDetails. 2 2)))
(is (= 1 (.size (.getRunningTopologies node))))
(is (= false (.isTotallyFree node)))
(is (= 2 (.totalSlotsFree node)))
(is (= 2 (.totalSlotsUsed node)))
(is (= 4 (.totalSlots node)))
- (.assign node topology2 (list (ExecutorDetails. 1 1)) cluster)
+ (.assign node topology2 (list (ExecutorDetails. 1 1)))
(is (= 2 (.size (.getRunningTopologies node))))
(is (= false (.isTotallyFree node)))
(is (= 1 (.totalSlotsFree node)))
(is (= 3 (.totalSlotsUsed node)))
(is (= 4 (.totalSlots node)))
- (.assign node topology2 (list (ExecutorDetails. 2 2)) cluster)
+ (.assign node topology2 (list (ExecutorDetails. 2 2)))
(is (= 2 (.size (.getRunningTopologies node))))
(is (= false (.isTotallyFree node)))
(is (= 0 (.totalSlotsFree node)))
(is (= 4 (.totalSlotsUsed node)))
(is (= 4 (.totalSlots node)))
- (.freeAllSlots node cluster)
+ (.freeAllSlots node)
(is (= 0 (.size (.getRunningTopologies node))))
(is (= true (.isTotallyFree node)))
(is (= 4 (.totalSlotsFree node)))
@@ -153,7 +153,8 @@
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+ TOPOLOGY-PRIORITY 0}
storm-topology
1
(mk-ed-map [["wordSpout" 0 1]
@@ -162,8 +163,9 @@
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology1]))
- node-map (RAS_Node/getAllNodesFrom cluster topologies)
+ node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
scheduler (ResourceAwareScheduler.)]
+ (.prepare scheduler {})
(.schedule scheduler topologies cluster)
(let [assignment (.getAssignmentById cluster "topology1")
assigned-slots (.getSlots assignment)
@@ -192,7 +194,8 @@
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+ TOPOLOGY-PRIORITY 0}
storm-topology1
1
(mk-ed-map [["wordSpout1" 0 1]
@@ -212,7 +215,8 @@
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+ TOPOLOGY-PRIORITY 0}
storm-topology2
1
(mk-ed-map [["wordSpoutX" 0 1]
@@ -223,6 +227,7 @@
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology1 topology2]))
scheduler (ResourceAwareScheduler.)]
+ (.prepare scheduler {})
(.schedule scheduler topologies cluster)
(let [assignment (.getAssignmentById cluster "topology1")
assigned-slots (.getSlots assignment)
@@ -255,7 +260,8 @@
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+ TOPOLOGY-PRIORITY 0}
storm-topology
2
(mk-ed-map [["wordSpout" 0 1]
@@ -265,6 +271,7 @@
"backtype.storm.testing.AlternateRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology2]))
scheduler (ResourceAwareScheduler.)]
+ (.prepare scheduler {})
(.schedule scheduler topologies cluster)
(let [assignment (.getAssignmentById cluster "topology2")
assigned-slots (.getSlots assignment)
@@ -292,7 +299,8 @@
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+ TOPOLOGY-PRIORITY 0}
storm-topology
2 ;; need two workers, each on one node
(mk-ed-map [["wordSpout" 0 2]
@@ -302,6 +310,7 @@
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology1]))
scheduler (ResourceAwareScheduler.)]
+ (.prepare scheduler {})
(.schedule scheduler topologies cluster)
(let [assignment (.getAssignmentById cluster "topology1")
assigned-slots (.getSlots assignment)
@@ -331,7 +340,7 @@
(is (< (Math/abs (- 100.0 (first assigned-ed-cpu))) epsilon))
(is (< (Math/abs (- 250.0 (second assigned-ed-cpu))) epsilon))
(is (< (Math/abs (- 250.0 (last assigned-ed-cpu))) epsilon))
- (doseq [[avail used] mem-avail->used] ;; for each node, assigned mem smaller than total
+ (doseq [[avail used] mem-avail->used] ;; for each node, assigned mem smaller than total
(is (>= avail used)))
(doseq [[avail used] cpu-avail->used] ;; for each node, assigned cpu smaller than total
(is (>= avail used))))
@@ -348,7 +357,8 @@
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+ TOPOLOGY-PRIORITY 0}
storm-topology1
3 ;; three workers to hold three executors
(mk-ed-map [["spout1" 0 3]]))
@@ -361,7 +371,8 @@
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 1280.0 ;; large enough thus two eds can not be fully assigned to one node
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+ TOPOLOGY-PRIORITY 0}
storm-topology2
2 ;; two workers, each holds one executor and resides on one node
(mk-ed-map [["spout2" 0 2]]))
@@ -372,6 +383,7 @@
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology2]))
+ _ (.prepare scheduler {})
_ (.schedule scheduler topologies cluster)
assignment (.getAssignmentById cluster "topology2")
failed-worker (first (vec (.getSlots assignment))) ;; choose a worker to mock as failed
@@ -380,6 +392,7 @@
_ (doseq [ed failed-eds] (.remove ed->slot ed)) ;; remove executor details assigned to the worker
copy-old-mapping (HashMap. ed->slot)
healthy-eds (.keySet copy-old-mapping)
+ _ (.prepare scheduler {})
_ (.schedule scheduler topologies cluster)
new-assignment (.getAssignmentById cluster "topology2")
new-ed->slot (.getExecutorToSlot new-assignment)]
@@ -387,7 +400,7 @@
(doseq [ed healthy-eds]
(is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
(is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))))
-
+
(testing "When a supervisor fails, RAS does not alter existing assignments"
(let [existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1"
{(ExecutorDetails. 0 0) (WorkerSlot. "id0" 0) ;; worker 0 on the failed super
@@ -401,11 +414,12 @@
ed->slot (.getExecutorToSlot assignment)
copy-old-mapping (HashMap. ed->slot)
existing-eds (.keySet copy-old-mapping) ;; all the three eds on three workers
- new-cluster (Cluster. (nimbus/standalone-nimbus)
+ new-cluster (Cluster. (nimbus/standalone-nimbus)
(dissoc supers "id0") ;; mock the super0 as a failed supervisor
(.getAssignments cluster)
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
+ _ (.prepare scheduler {})
_ (.schedule scheduler topologies new-cluster) ;; the actual schedule for this topo will not run since it is fully assigned
new-assignment (.getAssignmentById new-cluster "topology1")
new-ed->slot (.getExecutorToSlot new-assignment)]
@@ -427,11 +441,12 @@
_ (.remove ed->slot (ExecutorDetails. 1 1)) ;; delete one worker of super0 (failed) from topo1 assignment to enable actual schedule for testing
copy-old-mapping (HashMap. ed->slot)
existing-eds (.keySet copy-old-mapping) ;; namely the two eds on the orphaned worker and the healthy worker
- new-cluster (Cluster. (nimbus/standalone-nimbus)
+ new-cluster (Cluster. (nimbus/standalone-nimbus)
(dissoc supers "id0") ;; mock the super0 as a failed supervisor
(.getAssignments cluster)
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
+ _ (.prepare scheduler {})
_ (.schedule scheduler topologies new-cluster)
new-assignment (.getAssignmentById new-cluster "topology1")
new-ed->slot (.getExecutorToSlot new-assignment)]
@@ -444,11 +459,13 @@
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology1]))
+ _ (.prepare scheduler {})
_ (.schedule scheduler topologies cluster)
assignment (.getAssignmentById cluster "topology1")
ed->slot (.getExecutorToSlot assignment)
copy-old-mapping (HashMap. ed->slot)
new-topologies (Topologies. (to-top-map [topology1 topology2])) ;; a second topology joins
+ _ (.prepare scheduler {})
_ (.schedule scheduler new-topologies cluster)
new-assignment (.getAssignmentById cluster "topology1")
new-ed->slot (.getExecutorToSlot new-assignment)]
@@ -459,17 +476,17 @@
;; Automated tests for heterogeneous cluster
(deftest test-heterogeneous-cluster
- (let [supers (into {} (for [super [(SupervisorDetails. (str "id" 0) (str "host" 0) (list )
+ (let [supers (into {} (for [super [(SupervisorDetails. (str "id" 0) (str "host" 0) (list )
(map int (list 1 2 3 4))
{Config/SUPERVISOR_MEMORY_CAPACITY_MB 4096.0
Config/SUPERVISOR_CPU_CAPACITY 800.0})
- (SupervisorDetails. (str "id" 1) (str "host" 1) (list )
+ (SupervisorDetails. (str "id" 1) (str "host" 1) (list )
(map int (list 1 2 3 4))
{Config/SUPERVISOR_MEMORY_CAPACITY_MB 1024.0
Config/SUPERVISOR_CPU_CAPACITY 200.0})]]
{(.getId super) super}))
builder1 (TopologyBuilder.) ;; topo1 has one single huge task that can not be handled by the small-super
- _ (doto (.setSpout builder1 "spout1" (TestWordSpout.) 1)
+ _ (doto (.setSpout builder1 "spout1" (TestWordSpout.) 1)
(.setMemoryLoad 2000.0 48.0)
(.setCPULoad 300.0))
storm-topology1 (.createTopology builder1)
@@ -479,7 +496,8 @@
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+ TOPOLOGY-PRIORITY 0}
storm-topology1
1
(mk-ed-map [["spout1" 0 1]]))
@@ -494,9 +512,10 @@
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+ TOPOLOGY-PRIORITY 0}
storm-topology2
- 2
+ 2
(mk-ed-map [["spout2" 0 4]]))
builder3 (TopologyBuilder.) ;; topo3 has 4 medium tasks, launching topo 1-3 together requires the same mem as the cluster's mem capacity (5G)
_ (doto (.setSpout builder3 "spout3" (TestWordSpout.) 4)
@@ -509,9 +528,10 @@
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+ TOPOLOGY-PRIORITY 0}
storm-topology3
- 2
+ 2
(mk-ed-map [["spout3" 0 4]]))
builder4 (TopologyBuilder.) ;; topo4 has 12 small tasks, each's mem req does not exactly divide a node's mem capacity
_ (doto (.setSpout builder4 "spout4" (TestWordSpout.) 2)
@@ -524,9 +544,10 @@
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+ TOPOLOGY-PRIORITY 0}
storm-topology4
- 2
+ 2
(mk-ed-map [["spout4" 0 12]]))
builder5 (TopologyBuilder.) ;; topo5 has 40 small tasks, it should be able to exactly use up both the cpu and mem in teh cluster
_ (doto (.setSpout builder5 "spout5" (TestWordSpout.) 40)
@@ -539,26 +560,28 @@
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
+ TOPOLOGY-PRIORITY 0}
storm-topology5
- 2
+ 2
(mk-ed-map [["spout5" 0 40]]))
epsilon 0.000001
- topologies (Topologies. (to-top-map [topology1 topology2]))
- scheduler (ResourceAwareScheduler.)]
+ topologies (Topologies. (to-top-map [topology1 topology2]))]
(testing "Launch topo 1-3 together, it should be able to use up either mem or cpu resource due to exact division"
(let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
+ scheduler (ResourceAwareScheduler.)
+ _ (.prepare scheduler {})
_ (.schedule scheduler topologies cluster)
super->mem-usage (get-super->mem-usage cluster topologies)
super->cpu-usage (get-super->cpu-usage cluster topologies)]
(is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
(is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))
(is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology3")))
- (doseq [super (.values supers)]
+ (doseq [super (.values supers)]
(let [mem-avail (.getTotalMemory super)
mem-used (.get super->mem-usage super)
cpu-avail (.getTotalCPU super)
@@ -571,6 +594,8 @@
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
+ scheduler (ResourceAwareScheduler.)
+ _ (.prepare scheduler {})
_ (.schedule scheduler topologies cluster)
scheduled-topos (if (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")) 1 0)
scheduled-topos (+ scheduled-topos (if (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")) 1 0))
@@ -582,11 +607,13 @@
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology5]))
+ scheduler (ResourceAwareScheduler.)
+ _ (.prepare scheduler {})
_ (.schedule scheduler topologies cluster)
super->mem-usage (get-super->mem-usage cluster topologies)
super->cpu-usage (get-super->cpu-usage cluster topologies)]
(is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology5")))
- (doseq [super (.values supers)]
+ (doseq [super (.values supers)]
(let [mem-avail (.getTotalMemory super)
mem-used (.get super->mem-usage super)
cpu-avail (.getTotalCPU ^SupervisorDetails super)
@@ -606,15 +633,17 @@
storm-topology1 (.createTopology builder1)
topology1 (TopologyDetails. "topology1"
{TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
+ TOPOLOGY-SUBMITTER-USER "userA"
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
+ TOPOLOGY-PRIORITY 0}
storm-topology1
1
(mk-ed-map [["spout1" 0 4]]))
topologies (Topologies. (to-top-map [topology1]))]
+ (.prepare scheduler {"userA" {"cpu" 2000.0 "memory" 400.0}})
(.schedule scheduler topologies cluster)
(is (= (.get (.getStatusMap cluster) "topology1") "Fully Scheduled"))
(is (= (.getAssignedNumWorkers cluster topology1) 4)))
@@ -632,18 +661,20 @@
TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0}
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
+ TOPOLOGY-PRIORITY 0}
storm-topology1
1
(mk-ed-map [["spout1" 0 5]]))
topologies (Topologies. (to-top-map [topology1]))]
+ (.prepare scheduler {})
(.schedule scheduler topologies cluster)
;;spout1 is going to contain 5 executors that needs scheduling. Each of those executors has a memory requirement of 128.0 MB
;;The cluster contains 4 free WorkerSlots. For this topolology each worker is limited to a max heap size of 128.0
;;Thus, one executor not going to be able to get scheduled thus failing the scheduling of this topology and no executors of this topology will be scheduleded
(is (= (.size (.getUnassignedExecutors cluster topology1)) 5))
- (is (= (.get (.getStatusMap cluster) "topology1") "Unsuccessful in scheduling")))
-
+ (is (= (.get (.getStatusMap cluster) "topology1") "Not all executors successfully scheduled: [[1, 1]]")))
+
(let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
@@ -652,11 +683,12 @@
_ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
storm-topology1 (.createTopology builder1)
conf {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 129.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0}
+ TOPOLOGY-SUBMITTER-USER "userC"
+ TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 129.0
+ TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
+ TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
+ TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
+ TOPOLOGY-PRIORITY 0}
topology1 (TopologyDetails. "topology1"
conf
storm-topology1
@@ -665,5 +697,5 @@
topologies (Topologies. (to-top-map [topology1]))]
(is (thrown? IllegalArgumentException
(StormSubmitter/submitTopologyWithProgressBar "test" conf storm-topology1)))
-
+
)))
http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
index 7f193cc..a5e06c3 100644
--- a/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
+++ b/storm-core/test/jvm/backtype/storm/TestConfigValidate.java
@@ -628,9 +628,64 @@ public class TestConfigValidate {
} catch (IllegalArgumentException Ex) {
}
}
+ }
+
+ @Test
+ public void TestResourceAwareSchedulerUserPool() {
+ TestConfig config = new TestConfig();
+ Collection<Object> passCases = new LinkedList<Object>();
+ Collection<Object> failCases = new LinkedList<Object>();
+
+ Map<String, Map<String, Integer>> passCase1 = new HashMap<String, Map<String, Integer>>();
+ passCase1.put("jerry", new HashMap<String, Integer>());
+ passCase1.put("bobby", new HashMap<String, Integer>());
+ passCase1.put("derek", new HashMap<String, Integer>());
+
+ passCase1.get("jerry").put("cpu", 10000);
+ passCase1.get("jerry").put("memory", 20148);
+ passCase1.get("bobby").put("cpu", 20000);
+ passCase1.get("bobby").put("memory", 40148);
+ passCase1.get("derek").put("cpu", 30000);
+ passCase1.get("derek").put("memory", 60148);
+
+ passCases.add(passCase1);
+
+ for (Object value : passCases) {
+ config.put(TestConfig.TEST_MAP_CONFIG_7, value);
+ ConfigValidation.validateFields(config, TestConfig.class);
+ }
+
+ Map<String, Map<String, Integer>> failCase1 = new HashMap<String, Map<String, Integer>>();
+ failCase1.put("jerry", new HashMap<String, Integer>());
+ failCase1.put("bobby", new HashMap<String, Integer>());
+ failCase1.put("derek", new HashMap<String, Integer>());
+ failCase1.get("jerry").put("cpu", 10000);
+ failCase1.get("jerry").put("memory", 20148);
+ failCase1.get("bobby").put("cpu", 20000);
+ failCase1.get("bobby").put("memory", 40148);
+ failCase1.get("derek").put("cpu", 30000);
+ Map<String, Map<String, Integer>> failCase2 = new HashMap<String, Map<String, Integer>>();
+ failCase2.put("jerry", new HashMap<String, Integer>());
+ failCase2.put("bobby", new HashMap<String, Integer>());
+ failCase2.put("derek", new HashMap<String, Integer>());
+ failCase2.get("bobby").put("cpu", 20000);
+ failCase2.get("bobby").put("memory", 40148);
+ failCase2.get("derek").put("cpu", 30000);
+ failCase2.get("derek").put("memory", 60148);
+ failCases.add(failCase1);
+ failCases.add(failCase2);
+
+ for (Object value : failCases) {
+ try {
+ config.put(TestConfig.TEST_MAP_CONFIG_7, value);
+ ConfigValidation.validateFields(config, TestConfig.class);
+ Assert.fail("Expected Exception not Thrown for value: " + value);
+ } catch (IllegalArgumentException Ex) {
+ }
+ }
}
public class TestConfig extends HashMap<String, Object> {
@@ -656,5 +711,8 @@ public class TestConfigValidate {
@isMapEntryCustom(keyValidatorClasses = {StringValidator.class}, valueValidatorClasses = {ImpersonationAclUserEntryValidator.class})
public static final String TEST_MAP_CONFIG_6 = "test.map.config.6";
+
+ @isMapEntryCustom(keyValidatorClasses = {StringValidator.class}, valueValidatorClasses = {UserResourcePoolEntryValidator.class})
+ public static final String TEST_MAP_CONFIG_7 = "test.map.config.7";
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
new file mode 100644
index 0000000..bea41ff
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import backtype.storm.Config;
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.INimbus;
+import backtype.storm.scheduler.SchedulerAssignmentImpl;
+import backtype.storm.scheduler.SupervisorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.utils.Time;
+import backtype.storm.utils.Utils;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by jerrypeng on 11/11/15.
+ */
+public class Experiment {
+ @Test
+ public void TestMultipleUsers() {
+// INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+// Map<String, Number> resourceMap = new HashMap<String, Number>();
+// resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 1000.0);
+// resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
+// Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
+// Config config = new Config();
+// config.putAll(Utils.readDefaultConfig());
+// Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+// resourceUserPool.put("jerry", new HashMap<String, Number>());
+// resourceUserPool.get("jerry").put("cpu", 1000);
+// resourceUserPool.get("jerry").put("memory", 8192.0);
+//
+// resourceUserPool.put("bobby", new HashMap<String, Number>());
+// resourceUserPool.get("bobby").put("cpu", 10000.0);
+// resourceUserPool.get("bobby").put("memory", 32768);
+//
+// resourceUserPool.put("derek", new HashMap<String, Number>());
+// resourceUserPool.get("derek").put("cpu", 5000.0);
+// resourceUserPool.get("derek").put("memory", 16384.0);
+//
+// config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+// Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+//
+// config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+//
+// TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+// TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+// TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+// TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+// TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+//
+// config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+//
+// TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+// TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+// TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+// TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+// TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+//
+// config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+//
+// TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+// TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+// TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+// TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+// TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+//
+//
+// Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+// topoMap.put(topo1.getId(), topo1);
+// topoMap.put(topo2.getId(), topo2);
+// topoMap.put(topo3.getId(), topo3);
+// topoMap.put(topo4.getId(), topo4);
+// topoMap.put(topo5.getId(), topo5);
+// topoMap.put(topo6.getId(), topo6);
+// topoMap.put(topo7.getId(), topo7);
+// topoMap.put(topo8.getId(), topo8);
+// topoMap.put(topo9.getId(), topo9);
+// topoMap.put(topo10.getId(), topo10);
+// topoMap.put(topo11.getId(), topo11);
+// topoMap.put(topo12.getId(), topo12);
+// topoMap.put(topo13.getId(), topo13);
+// topoMap.put(topo14.getId(), topo14);
+// topoMap.put(topo15.getId(), topo15);
+//
+// Topologies topologies = new Topologies(topoMap);
+//
+// ResourceAwareScheduler rs = new ResourceAwareScheduler();
+//
+// rs.prepare(config);
+// rs.schedule(topologies, cluster);
+//
+// for(TopologyDetails topo : topoMap.values()) {
+// Assert.assertEquals(cluster.getStatusMap().get(topo.getId()), "Fully Scheduled");
+// }
+//
+// for(User user : rs.getUserMap().values()) {
+// Assert.assertEquals(user.getTopologiesPending().size(), 0);
+// Assert.assertEquals(user.getTopologiesRunning().size(), 5);
+// }
+
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+ resourceUserPool.put("jerry", new HashMap<String, Number>());
+ resourceUserPool.get("jerry").put("cpu", 1000);
+ resourceUserPool.get("jerry").put("memory", 8192.0);
+
+ resourceUserPool.put("bobby", new HashMap<String, Number>());
+ resourceUserPool.get("bobby").put("cpu", 10000.0);
+ resourceUserPool.get("bobby").put("memory", 32768);
+
+ resourceUserPool.put("derek", new HashMap<String, Number>());
+ resourceUserPool.get("derek").put("cpu", 5000.0);
+ resourceUserPool.get("derek").put("memory", 16384.0);
+
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo1.getId(), topo1);
+ topoMap.put(topo2.getId(), topo2);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ int fullyScheduled = 0;
+ for (TopologyDetails topo : topoMap.values()) {
+ if(cluster.getStatusMap().get(topo.getId()).equals("Fully Scheduled")) {
+ fullyScheduled++;
+ }
+ }
+ Assert.assertEquals("# of Fully scheduled", 1, fullyScheduled);
+ Assert.assertEquals("# of topologies schedule attempted", 1, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of topologies running", 1, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of topologies schedule pending", 0, rs.getUser("jerry").getTopologiesPending().size());
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
new file mode 100644
index 0000000..73a8c73
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import backtype.storm.Config;
+import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.INimbus;
+import backtype.storm.scheduler.IScheduler;
+import backtype.storm.scheduler.SchedulerAssignmentImpl;
+import backtype.storm.scheduler.SupervisorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.utils.Time;
+import backtype.storm.utils.Utils;
+import backtype.storm.validation.ConfigValidation;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+public class TestResourceAwareScheduler {
+
+ private static final int NUM_SUPS = 20;
+ private static final int NUM_WORKERS_PER_SUP = 4;
+ private final String TOPOLOGY_SUBMITTER = "jerry";
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestResourceAwareScheduler.class);
+
+ @Test
+ public void TestReadInResourceAwareSchedulerUserPools() {
+
+ Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
+ LOG.info("fromFile: {}", fromFile);
+ ConfigValidation.validateFields(fromFile);
+ }
+
+ @Test
+ public void TestTopologySortedInCorrectOrder() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(20, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER);
+
+ Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+ resourceUserPool.put("jerry", new HashMap<String, Number>());
+ resourceUserPool.get("jerry").put("cpu", 1000);
+ resourceUserPool.get("jerry").put("memory", 8192.0);
+
+ resourceUserPool.put("bobby", new HashMap<String, Number>());
+ resourceUserPool.get("bobby").put("cpu", 10000.0);
+ resourceUserPool.get("bobby").put("memory", 32768);
+
+ resourceUserPool.put("derek", new HashMap<String, Number>());
+ resourceUserPool.get("derek").put("cpu", 5000.0);
+ resourceUserPool.get("derek").put("memory", 16384.0);
+
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo1.getId(), topo1);
+ topoMap.put(topo2.getId(), topo2);
+ topoMap.put(topo3.getId(), topo3);
+ topoMap.put(topo4.getId(), topo4);
+ topoMap.put(topo5.getId(), topo5);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ Set<TopologyDetails> queue = rs.getUser("jerry").getTopologiesPending();
+ Assert.assertEquals("check size", queue.size(), 0);
+
+ queue = rs.getUser("jerry").getTopologiesRunning();
+
+ Iterator<TopologyDetails> itr = queue.iterator();
+
+ TopologyDetails topo = itr.next();
+ LOG.info("{} - {}", topo.getName(), queue);
+ Assert.assertEquals("check order", topo.getName(), "topo-4");
+
+ topo = itr.next();
+ LOG.info("{} - {}", topo.getName(), queue);
+ Assert.assertEquals("check order", topo.getName(), "topo-1");
+
+ topo = itr.next();
+ LOG.info("{} - {}", topo.getName(), queue);
+ Assert.assertEquals("check order", topo.getName(), "topo-5");
+
+ topo = itr.next();
+ LOG.info("{} - {}", topo.getName(), queue);
+ Assert.assertEquals("check order", topo.getName(), "topo-3");
+
+ topo = itr.next();
+ LOG.info("{} - {}", topo.getName(), queue);
+ Assert.assertEquals("check order", topo.getName(), "topo-2");
+
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 30, 10);
+ topoMap.put(topo6.getId(), topo6);
+
+ topologies = new Topologies(topoMap);
+ rs = new ResourceAwareScheduler();
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ queue = rs.getUser("jerry").getTopologiesRunning();
+ itr = queue.iterator();
+
+ topo = itr.next();
+ Assert.assertEquals("check order", topo.getName(), "topo-6");
+
+ topo = itr.next();
+ Assert.assertEquals("check order", topo.getName(), "topo-4");
+
+ topo = itr.next();
+ Assert.assertEquals("check order", topo.getName(), "topo-1");
+
+ topo = itr.next();
+ Assert.assertEquals("check order", topo.getName(), "topo-5");
+
+ topo = itr.next();
+ Assert.assertEquals("check order", topo.getName(), "topo-3");
+
+ topo = itr.next();
+ Assert.assertEquals("check order", topo.getName(), "topo-2");
+
+ queue = rs.getUser("jerry").getTopologiesPending();
+ Assert.assertEquals("check size", queue.size(), 0);
+ }
+
+ @Test
+ public void TestMultipleUsers() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 1000.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(20, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+ resourceUserPool.put("jerry", new HashMap<String, Number>());
+ resourceUserPool.get("jerry").put("cpu", 1000);
+ resourceUserPool.get("jerry").put("memory", 8192.0);
+
+ resourceUserPool.put("bobby", new HashMap<String, Number>());
+ resourceUserPool.get("bobby").put("cpu", 10000.0);
+ resourceUserPool.get("bobby").put("memory", 32768);
+
+ resourceUserPool.put("derek", new HashMap<String, Number>());
+ resourceUserPool.get("derek").put("cpu", 5000.0);
+ resourceUserPool.get("derek").put("memory", 16384.0);
+
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+ TopologyDetails topo8 = TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+ TopologyDetails topo9 = TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+ TopologyDetails topo10 = TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+ TopologyDetails topo11 = TestUtilsForResourceAwareScheduler.getTopology("topo-11", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo12 = TestUtilsForResourceAwareScheduler.getTopology("topo-12", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+ TopologyDetails topo13 = TestUtilsForResourceAwareScheduler.getTopology("topo-13", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30);
+ TopologyDetails topo14 = TestUtilsForResourceAwareScheduler.getTopology("topo-14", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20);
+ TopologyDetails topo15 = TestUtilsForResourceAwareScheduler.getTopology("topo-15", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo1.getId(), topo1);
+ topoMap.put(topo2.getId(), topo2);
+ topoMap.put(topo3.getId(), topo3);
+ topoMap.put(topo4.getId(), topo4);
+ topoMap.put(topo5.getId(), topo5);
+ topoMap.put(topo6.getId(), topo6);
+ topoMap.put(topo7.getId(), topo7);
+ topoMap.put(topo8.getId(), topo8);
+ topoMap.put(topo9.getId(), topo9);
+ topoMap.put(topo10.getId(), topo10);
+ topoMap.put(topo11.getId(), topo11);
+ topoMap.put(topo12.getId(), topo12);
+ topoMap.put(topo13.getId(), topo13);
+ topoMap.put(topo14.getId(), topo14);
+ topoMap.put(topo15.getId(), topo15);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : topoMap.values()) {
+ Assert.assertEquals(cluster.getStatusMap().get(topo.getId()), "Fully Scheduled");
+ }
+
+ for (User user : rs.getUserMap().values()) {
+ Assert.assertEquals(user.getTopologiesPending().size(), 0);
+ Assert.assertEquals(user.getTopologiesRunning().size(), 5);
+ }
+ }
+
+ @Test
+ public void testHandlingClusterSubscription() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0 * 10);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+ resourceUserPool.put("jerry", new HashMap<String, Number>());
+ resourceUserPool.get("jerry").put("cpu", 1000);
+ resourceUserPool.get("jerry").put("memory", 8192.0);
+
+ resourceUserPool.put("bobby", new HashMap<String, Number>());
+ resourceUserPool.get("bobby").put("cpu", 10000.0);
+ resourceUserPool.get("bobby").put("memory", 32768);
+
+ resourceUserPool.put("derek", new HashMap<String, Number>());
+ resourceUserPool.get("derek").put("cpu", 5000.0);
+ resourceUserPool.get("derek").put("memory", 16384.0);
+
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo1.getId(), topo1);
+ topoMap.put(topo2.getId(), topo2);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ int fullyScheduled = 0;
+ for (TopologyDetails topo : topoMap.values()) {
+ if(cluster.getStatusMap().get(topo.getId()).equals("Fully Scheduled")) {
+ fullyScheduled++;
+ }
+ }
+ Assert.assertEquals("# of Fully scheduled", 1, fullyScheduled);
+ Assert.assertEquals("# of topologies schedule attempted", 1, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of topologies running", 1, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of topologies schedule pending", 0, rs.getUser("jerry").getTopologiesPending().size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
new file mode 100644
index 0000000..e9bf039
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import backtype.storm.Config;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.utils.Time;
+import backtype.storm.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class TestUser {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestUser.class);
+
+ @Test
+ public void testAddTopologyToPendingQueue() {
+
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+
+ List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config);
+ User user1 = new User("user1");
+
+ for (TopologyDetails topo : topos) {
+ user1.addTopologyToPendingQueue(topo);
+ }
+
+ Assert.assertTrue(user1.getTopologiesPending().size() == topos.size());
+
+ List<String> correctOrder = TestUtilsForResourceAwareScheduler.getListOfTopologiesCorrectOrder();
+ Iterator<String> itr = correctOrder.iterator();
+ for (TopologyDetails topo : user1.getTopologiesPending()) {
+ Assert.assertEquals("check order", topo.getName(), itr.next());
+ }
+ }
+
+ @Test
+ public void testMoveTopoFromPendingToRunning() {
+
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+
+ List<TopologyDetails> topos = TestUtilsForResourceAwareScheduler.getListOfTopologies(config);
+ User user1 = new User("user1");
+
+ for (TopologyDetails topo : topos) {
+ user1.addTopologyToPendingQueue(topo);
+ }
+
+ int counter = 1;
+ for (TopologyDetails topo : topos) {
+ user1.moveTopoFromPendingToRunning(topo);
+ Assert.assertEquals("check correct size", (topos.size() - counter), user1.getTopologiesPending().size());
+ Assert.assertEquals("check correct size", counter, user1.getTopologiesRunning().size());
+ counter++;
+ }
+ }
+
+ @Test
+ public void testResourcePoolUtilization() {
+
+ Double cpuGuarantee = 400.0;
+ Double memoryGuarantee = 1000.0;
+ Map<String, Double> resourceGuaranteeMap = new HashMap<String, Double>();
+ resourceGuaranteeMap.put("cpu", cpuGuarantee);
+ resourceGuaranteeMap.put("memory", memoryGuarantee);
+
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 200);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 200);
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 1, 2, 1, Time.currentTimeSecs() - 24, 9);
+
+ User user1 = new User("user1", resourceGuaranteeMap);
+
+ user1.addTopologyToRunningQueue(topo1);
+
+ Assert.assertEquals("check cpu resource guarantee", cpuGuarantee, user1.getCPUResourceGuaranteed(), 0.001);
+ Assert.assertEquals("check memory resource guarantee", memoryGuarantee, user1.getMemoryResourceGuaranteed(), 0.001);
+
+ Assert.assertEquals("check cpu resource pool utilization", ((100.0 * 3.0) / cpuGuarantee), user1.getCPUResourcePoolUtilization().doubleValue(), 0.001);
+ Assert.assertEquals("check memory resource pool utilization", ((200.0 + 200.0) * 3.0) / memoryGuarantee, user1.getMemoryResourcePoolUtilization().doubleValue(), 0.001);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/3f55feef/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
new file mode 100644
index 0000000..d4177c3
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.scheduler.resource;
+
+import backtype.storm.Config;
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.scheduler.ExecutorDetails;
+import backtype.storm.scheduler.INimbus;
+import backtype.storm.scheduler.IScheduler;
+import backtype.storm.scheduler.SupervisorDetails;
+import backtype.storm.scheduler.Topologies;
+import backtype.storm.scheduler.TopologyDetails;
+import backtype.storm.scheduler.WorkerSlot;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Time;
+import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+public class TestUtilsForResourceAwareScheduler {
+ private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
+
+ public static List<TopologyDetails> getListOfTopologies(Config config) {
+
+ List<TopologyDetails> topos = new LinkedList<TopologyDetails>();
+
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 20));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 30));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 30));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 20));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 30));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 5, 15, 1, 1, Time.currentTimeSecs() - 2, 0));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 5, 15, 1, 1, Time.currentTimeSecs() - 8, 0));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-8", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 15));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-9", config, 5, 15, 1, 1, Time.currentTimeSecs() - 16, 8));
+ topos.add(TestUtilsForResourceAwareScheduler.getTopology("topo-10", config, 5, 15, 1, 1, Time.currentTimeSecs() - 24, 9));
+ return topos;
+ }
+
+ public static List<String> getListOfTopologiesCorrectOrder() {
+ List<String> topos = new LinkedList<String>();
+ topos.add("topo-7");
+ topos.add("topo-6");
+ topos.add("topo-9");
+ topos.add("topo-10");
+ topos.add("topo-8");
+ topos.add("topo-4");
+ topos.add("topo-1");
+ topos.add("topo-5");
+ topos.add("topo-3");
+ topos.add("topo-2");
+ return topos;
+ }
+
+ public static Map<String, SupervisorDetails> genSupervisors(int numSup, int numPorts, Map resourceMap) {
+ Map<String, SupervisorDetails> retList = new HashMap<String, SupervisorDetails>();
+ for(int i=0; i<numSup; i++) {
+ List<Number> ports = new LinkedList<Number>();
+ for(int j = 0; j<numPorts; j++) {
+ ports.add(j);
+ }
+ SupervisorDetails sup = new SupervisorDetails("sup-"+i, "host-"+i, null, ports, resourceMap);
+ retList.put(sup.getId(), sup);
+ }
+ return retList;
+ }
+
+ public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology, int spoutParallelism, int boltParallelism) {
+ Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String> ();
+ int startTask=0;
+ int endTask=1;
+ for(Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+ for(int i=0; i<spoutParallelism; i++) {
+ retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
+ startTask++;
+ endTask++;
+ }
+ }
+
+ for(Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
+ for(int i=0; i<boltParallelism; i++) {
+ retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
+ startTask++;
+ endTask++;
+ }
+ }
+ return retMap;
+ }
+
+ public static TopologyDetails getTopology(String name, Map config, int numSpout, int numBolt,
+ int spoutParallelism, int boltParallelism, int launchTime, int priority) {
+
+ Config conf = new Config();
+ conf.putAll(config);
+ conf.put(Config.TOPOLOGY_PRIORITY, priority);
+ conf.put(Config.TOPOLOGY_NAME, name);
+ StormTopology topology = buildTopology(numSpout,numBolt, spoutParallelism, boltParallelism);
+ TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
+ 0,
+ genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime);
+ return topo;
+ }
+
+ public static StormTopology buildTopology(int numSpout, int numBolt,
+ int spoutParallelism, int boltParallelism) {
+ LOG.debug("buildTopology with -> numSpout: " + numSpout + " spoutParallelism: "
+ + spoutParallelism + " numBolt: "
+ + numBolt + " boltParallelism: " + boltParallelism);
+ TopologyBuilder builder = new TopologyBuilder();
+
+ for (int i = 0; i < numSpout; i++) {
+ SpoutDeclarer s1 = builder.setSpout("spout-" + i, new TestSpout(),
+ spoutParallelism);
+ }
+ int j = 0;
+ for (int i = 0; i < numBolt; i++) {
+ if (j >= numSpout) {
+ j = 0;
+ }
+ BoltDeclarer b1 = builder.setBolt("bolt-" + i, new TestBolt(),
+ boltParallelism).shuffleGrouping("spout-" + j);
+ }
+
+ return builder.createTopology();
+ }
+
+ public static class TestSpout extends BaseRichSpout {
+ boolean _isDistributed;
+ SpoutOutputCollector _collector;
+
+ public TestSpout() {
+ this(true);
+ }
+
+ public TestSpout(boolean isDistributed) {
+ _isDistributed = isDistributed;
+ }
+
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ _collector = collector;
+ }
+
+ public void close() {
+
+ }
+
+ public void nextTuple() {
+ Utils.sleep(100);
+ final String[] words = new String[]{"nathan", "mike", "jackson", "golda", "bertels"};
+ final Random rand = new Random();
+ final String word = words[rand.nextInt(words.length)];
+ _collector.emit(new Values(word));
+ }
+
+ public void ack(Object msgId) {
+
+ }
+
+ public void fail(Object msgId) {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ if (!_isDistributed) {
+ Map<String, Object> ret = new HashMap<String, Object>();
+ ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
+ return ret;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ public static class TestBolt extends BaseRichBolt {
+ OutputCollector _collector;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context,
+ OutputCollector collector) {
+ _collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+ }
+
+ public static class INimbusTest implements INimbus {
+ @Override
+ public void prepare(Map stormConf, String schedulerLocalDir) {
+
+ }
+
+ @Override
+ public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, Topologies topologies, Set<String> topologiesMissingAssignments) {
+ return null;
+ }
+
+ @Override
+ public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) {
+
+ }
+
+ @Override
+ public String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId) {
+ if(existingSupervisors.containsKey(nodeId)) {
+ return existingSupervisors.get(nodeId).getHost();
+ }
+ return null;
+ }
+
+ @Override
+ public IScheduler getForcedScheduler() {
+ return null;
+ }
+ }
+}