You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/03/28 21:07:23 UTC
[3/8] storm git commit: Delete the clj code
Delete the clj code
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63406016
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63406016
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63406016
Branch: refs/heads/master
Commit: 6340601684ccd1cbe86d27594f8e32e78ec8a0df
Parents: 7a302e3
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Mar 18 16:13:28 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Mar 18 16:13:28 2016 -0500
----------------------------------------------------------------------
.../scheduler/resource_aware_scheduler_test.clj | 738 -------------------
1 file changed, 738 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/63406016/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
deleted file mode 100644
index 4ca0721..0000000
--- a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
+++ /dev/null
@@ -1,738 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.scheduler.resource-aware-scheduler-test
- (:use [clojure test])
- (:use [org.apache.storm util config testing])
- (:use [org.apache.storm.internal thrift])
- (:require [org.apache.storm.util :refer [map-val]])
- (:require [org.apache.storm.daemon [nimbus :as nimbus]])
- (:import [org.apache.storm.generated StormTopology]
- [org.apache.storm Config]
- [org.apache.storm.testing TestWordSpout TestWordCounter]
- [org.apache.storm.topology TopologyBuilder]
- [org.apache.storm.utils Utils])
- (:import [org.apache.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails
- SchedulerAssignmentImpl Topologies TopologyDetails])
- (:import [org.apache.storm.scheduler.resource RAS_Node RAS_Nodes ResourceAwareScheduler])
- (:import [org.apache.storm Config StormSubmitter])
- (:import [org.apache.storm LocalDRPC LocalCluster])
- (:import [java.util HashMap]))
-
-(defn gen-supervisors [count ports]
- (into {} (for [id (range count)
- :let [supervisor (SupervisorDetails. (str "id" id)
- (str "host" id)
- (list ) (map int (range ports))
- {Config/SUPERVISOR_MEMORY_CAPACITY_MB 2000.0
- Config/SUPERVISOR_CPU_CAPACITY 400.0})]]
- {(.getId supervisor) supervisor})))
-
-(defn to-top-map [topologies]
- (into {} (for [top topologies] {(.getId top) top})))
-
-(defn ed [id] (ExecutorDetails. (int id) (int id)))
-
-(defn mk-ed-map [arg]
- (into {}
- (for [[name start end] arg]
- (into {}
- (for [at (range start end)]
- {(ed at) name})))))
-(def DEFAULT_PRIORITY_STRATEGY "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy")
-(def DEFAULT_EVICTION_STRATEGY "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy")
-(def DEFAULT_SCHEDULING_STRATEGY "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy")
-
-;; get the super->mem HashMap by counting the eds' mem usage of all topos on each super
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn get-super->mem-usage [^Cluster cluster ^Topologies topologies]
- (let [assignments (.values (.getAssignments cluster))
- supers (.values (.getSupervisors cluster))
- super->mem-usage (HashMap.)
- _ (doseq [super supers]
- (.put super->mem-usage super 0))] ;; initialize the mem-usage as 0 for all supers
- (doseq [assignment assignments]
- (let [ed->super (into {}
- (for [[ed slot] (.getExecutorToSlot assignment)]
- {ed (.getSupervisorById cluster (.getNodeId slot))}))
- super->eds (clojurify-structure (Utils/reverseMap ed->super))
- topology (.getById topologies (.getTopologyId assignment))
- super->mem-pertopo (map-val (fn [eds]
- (reduce + (map #(.getTotalMemReqTask topology %) eds)))
- super->eds)] ;; sum up the one topo's eds' mem usage on a super
- (doseq [[super mem] super->mem-pertopo]
- (.put super->mem-usage
- super (+ mem (.get super->mem-usage super)))))) ;; add all topo's mem usage for each super
- super->mem-usage))
-
-;; get the super->cpu HashMap by counting the eds' cpu usage of all topos on each super
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn get-super->cpu-usage [^Cluster cluster ^Topologies topologies]
- (let [assignments (.values (.getAssignments cluster))
- supers (.values (.getSupervisors cluster))
- super->cpu-usage (HashMap.)
- _ (doseq [super supers]
- (.put super->cpu-usage super 0))] ;; initialize the cpu-usage as 0 for all supers
- (doseq [assignment assignments]
- (let [ed->super (into {}
- (for [[ed slot] (.getExecutorToSlot assignment)]
- {ed (.getSupervisorById cluster (.getNodeId slot))}))
- super->eds (clojurify-structure (Utils/reverseMap ed->super))
- topology (.getById topologies (.getTopologyId assignment))
- super->cpu-pertopo (map-val (fn [eds]
- (reduce + (map #(.getTotalCpuReqTask topology %) eds)))
- super->eds)] ;; sum up the one topo's eds' cpu usage on a super
- (doseq [[super cpu] super->cpu-pertopo]
- (.put super->cpu-usage
- super (+ cpu (.get super->cpu-usage super)))))) ;; add all topo's cpu usage for each super
- super->cpu-usage))
-
-; testing resource/Node class
-(deftest test-node
- (let [supers (gen-supervisors 5 4)
- cluster (Cluster. (nimbus/standalone-nimbus) supers {} {})
- topologies (Topologies. (to-top-map []))
- node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
- topology1 (TopologyDetails. "topology1" {} nil 0)
- topology2 (TopologyDetails. "topology2" {} nil 0)]
- (is (= 5 (.size node-map)))
- (let [node (.get node-map "id0")]
- (is (= "id0" (.getId node)))
- (is (= true (.isAlive node)))
- (is (= 0 (.size (.getRunningTopologies node))))
- (is (= true (.isTotallyFree node)))
- (is (= 4 (.totalSlotsFree node)))
- (is (= 0 (.totalSlotsUsed node)))
- (is (= 4 (.totalSlots node)))
- (.assign node (.next (.iterator (.getFreeSlots node))) topology1 (list (ExecutorDetails. 1 1)))
- (is (= 1 (.size (.getRunningTopologies node))))
- (is (= false (.isTotallyFree node)))
- (is (= 3 (.totalSlotsFree node)))
- (is (= 1 (.totalSlotsUsed node)))
- (is (= 4 (.totalSlots node)))
- (.assign node (.next (.iterator (.getFreeSlots node))) topology1 (list (ExecutorDetails. 2 2)))
- (is (= 1 (.size (.getRunningTopologies node))))
- (is (= false (.isTotallyFree node)))
- (is (= 2 (.totalSlotsFree node)))
- (is (= 2 (.totalSlotsUsed node)))
- (is (= 4 (.totalSlots node)))
- (.assign node (.next (.iterator (.getFreeSlots node))) topology2 (list (ExecutorDetails. 1 1)))
- (is (= 2 (.size (.getRunningTopologies node))))
- (is (= false (.isTotallyFree node)))
- (is (= 1 (.totalSlotsFree node)))
- (is (= 3 (.totalSlotsUsed node)))
- (is (= 4 (.totalSlots node)))
- (.assign node (.next (.iterator (.getFreeSlots node))) topology2 (list (ExecutorDetails. 2 2)))
- (is (= 2 (.size (.getRunningTopologies node))))
- (is (= false (.isTotallyFree node)))
- (is (= 0 (.totalSlotsFree node)))
- (is (= 4 (.totalSlotsUsed node)))
- (is (= 4 (.totalSlots node)))
- (.freeAllSlots node)
- (is (= 0 (.size (.getRunningTopologies node))))
- (is (= true (.isTotallyFree node)))
- (is (= 4 (.totalSlotsFree node)))
- (is (= 0 (.totalSlotsUsed node)))
- (is (= 4 (.totalSlots node)))
- )))
-
-(deftest test-sanity-resource-aware-scheduler
- (let [builder (TopologyBuilder.)
- _ (.setSpout builder "wordSpout" (TestWordSpout.) 1)
- _ (.shuffleGrouping (.setBolt builder "wordCountBolt" (TestWordCounter.) 1) "wordSpout")
- supers (gen-supervisors 1 2)
- storm-topology (.createTopology builder)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology
- 1
- (mk-ed-map [["wordSpout" 0 1]
- ["wordCountBolt" 1 2]]))
- cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1]))
- node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
- scheduler (ResourceAwareScheduler.)]
- (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- (.schedule scheduler topologies cluster)
- (let [assignment (.getAssignmentById cluster "topology1")
- assigned-slots (.getSlots assignment)
- executors (.getExecutors assignment)]
- (is (= 1 (.size assigned-slots)))
- (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
- (is (= 2 (.size executors))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))))
-
-(deftest test-topology-with-multiple-spouts
- (let [builder1 (TopologyBuilder.) ;; a topology with multiple spouts
- _ (.setSpout builder1 "wordSpout1" (TestWordSpout.) 1)
- _ (.setSpout builder1 "wordSpout2" (TestWordSpout.) 1)
- _ (doto
- (.setBolt builder1 "wordCountBolt1" (TestWordCounter.) 1)
- (.shuffleGrouping "wordSpout1")
- (.shuffleGrouping "wordSpout2"))
- _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt2" (TestWordCounter.) 1) "wordCountBolt1")
- _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt3" (TestWordCounter.) 1) "wordCountBolt1")
- _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt4" (TestWordCounter.) 1) "wordCountBolt2")
- _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt5" (TestWordCounter.) 1) "wordSpout2")
- storm-topology1 (.createTopology builder1)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology1
- 1
- (mk-ed-map [["wordSpout1" 0 1]
- ["wordSpout2" 1 2]
- ["wordCountBolt1" 2 3]
- ["wordCountBolt2" 3 4]
- ["wordCountBolt3" 4 5]
- ["wordCountBolt4" 5 6]
- ["wordCountBolt5" 6 7]]))
- builder2 (TopologyBuilder.) ;; a topology with two unconnected partitions
- _ (.setSpout builder2 "wordSpoutX" (TestWordSpout.) 1)
- _ (.setSpout builder2 "wordSpoutY" (TestWordSpout.) 1)
- storm-topology2 (.createTopology builder1)
- topology2 (TopologyDetails. "topology2"
- {TOPOLOGY-NAME "topology-name-2"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology2
- 1
- (mk-ed-map [["wordSpoutX" 0 1]
- ["wordSpoutY" 1 2]]))
- supers (gen-supervisors 2 4)
- cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1 topology2]))
- scheduler (ResourceAwareScheduler.)]
- (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- (.schedule scheduler topologies cluster)
- (let [assignment (.getAssignmentById cluster "topology1")
- assigned-slots (.getSlots assignment)
- executors (.getExecutors assignment)]
- (is (= 1 (.size assigned-slots)))
- (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
- (is (= 7 (.size executors))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
- (let [assignment (.getAssignmentById cluster "topology2")
- assigned-slots (.getSlots assignment)
- executors (.getExecutors assignment)]
- (is (= 1 (.size assigned-slots)))
- (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
- (is (= 2 (.size executors))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
-
-(deftest test-topology-set-memory-and-cpu-load
- (let [builder (TopologyBuilder.)
- _ (.setSpout builder "wordSpout" (TestWordSpout.) 1)
- _ (doto
- (.setBolt builder "wordCountBolt" (TestWordCounter.) 1)
- (.setMemoryLoad 110.0)
- (.setCPULoad 20.0)
- (.shuffleGrouping "wordSpout"))
- supers (gen-supervisors 2 2) ;; to test whether two tasks will be assigned to one or two nodes
- storm-topology (.createTopology builder)
- topology2 (TopologyDetails. "topology2"
- {TOPOLOGY-NAME "topology-name-2"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology
- 2
- (mk-ed-map [["wordSpout" 0 1]
- ["wordCountBolt" 1 2]]))
- cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.testing.AlternateRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology2]))
- scheduler (ResourceAwareScheduler.)]
- (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- (.schedule scheduler topologies cluster)
- (let [assignment (.getAssignmentById cluster "topology2")
- assigned-slots (.getSlots assignment)
- executors (.getExecutors assignment)]
- ;; 4 slots on 1 machine, all executors assigned
- (is (= 1 (.size assigned-slots)))
- (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
- (is (= 2 (.size executors))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
-
-(deftest test-resource-limitation
- (let [builder (TopologyBuilder.)
- _ (doto (.setSpout builder "wordSpout" (TestWordSpout.) 2)
- (.setMemoryLoad 1000.0 200.0)
- (.setCPULoad 250.0))
- _ (doto (.setBolt builder "wordCountBolt" (TestWordCounter.) 1)
- (.shuffleGrouping "wordSpout")
- (.setMemoryLoad 500.0 100.0)
- (.setCPULoad 100.0))
- supers (gen-supervisors 2 2) ;; need at least two nodes to hold these executors
- storm-topology (.createTopology builder)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology
- 2 ;; need two workers, each on one node
- (mk-ed-map [["wordSpout" 0 2]
- ["wordCountBolt" 2 3]]))
- cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1]))
- scheduler (ResourceAwareScheduler.)]
- (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- (.schedule scheduler topologies cluster)
- (let [assignment (.getAssignmentById cluster "topology1")
- assigned-slots (.getSlots assignment)
- node-ids (map #(.getNodeId %) assigned-slots)
- executors (.getExecutors assignment)
- epsilon 0.000001
- assigned-ed-mem (sort (map #(.getTotalMemReqTask topology1 %) executors))
- assigned-ed-cpu (sort (map #(.getTotalCpuReqTask topology1 %) executors))
- ed->super (into {}
- (for [[ed slot] (.getExecutorToSlot assignment)]
- {ed (.getSupervisorById cluster (.getNodeId slot))}))
- super->eds (clojurify-structure (Utils/reverseMap ed->super))
- mem-avail->used (into []
- (for [[super eds] super->eds]
- [(.getTotalMemory super) (reduce + (map #(.getTotalMemReqTask topology1 %) eds))]))
- cpu-avail->used (into []
- (for [[super eds] super->eds]
- [(.getTotalCPU super) (reduce + (map #(.getTotalCpuReqTask topology1 %) eds))]))]
- ;; 4 slots on 1 machine, all executors assigned
- (is (= 2 (.size assigned-slots))) ;; executor0 resides one one worker (on one), executor1 and executor2 on another worker (on the other node)
- (is (= 2 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
- (is (= 3 (.size executors)))
- ;; make sure resource (mem/cpu) assigned equals to resource specified
- (is (< (Math/abs (- 600.0 (first assigned-ed-mem))) epsilon))
- (is (< (Math/abs (- 1200.0 (second assigned-ed-mem))) epsilon))
- (is (< (Math/abs (- 1200.0 (last assigned-ed-mem))) epsilon))
- (is (< (Math/abs (- 100.0 (first assigned-ed-cpu))) epsilon))
- (is (< (Math/abs (- 250.0 (second assigned-ed-cpu))) epsilon))
- (is (< (Math/abs (- 250.0 (last assigned-ed-cpu))) epsilon))
- (doseq [[avail used] mem-avail->used] ;; for each node, assigned mem smaller than total
- (is (>= avail used)))
- (doseq [[avail used] cpu-avail->used] ;; for each node, assigned cpu smaller than total
- (is (>= avail used))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))))
-
-(deftest test-scheduling-resilience
- (let [supers (gen-supervisors 2 2)
- builder1 (TopologyBuilder.)
- _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
- storm-topology1 (.createTopology builder1)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology1
- 3 ;; three workers to hold three executors
- (mk-ed-map [["spout1" 0 3]]))
- builder2 (TopologyBuilder.)
- _ (.setSpout builder2 "spout2" (TestWordSpout.) 2)
- storm-topology2 (.createTopology builder2)
- topology2 (TopologyDetails. "topology2"
- {TOPOLOGY-NAME "topology-name-2"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 1280.0 ;; large enough thus two eds can not be fully assigned to one node
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology2
- 2 ;; two workers, each holds one executor and resides on one node
- (mk-ed-map [["spout2" 0 2]]))
- scheduler (ResourceAwareScheduler.)]
-
- (testing "When a worker fails, RAS does not alter existing assignments on healthy workers"
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology2]))
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies cluster)
- assignment (.getAssignmentById cluster "topology2")
- failed-worker (first (vec (.getSlots assignment))) ;; choose a worker to mock as failed
- ed->slot (.getExecutorToSlot assignment)
- failed-eds (.get (clojurify-structure (Utils/reverseMap ed->slot)) failed-worker)
- _ (doseq [ed failed-eds] (.remove ed->slot ed)) ;; remove executor details assigned to the worker
- copy-old-mapping (HashMap. ed->slot)
- healthy-eds (.keySet copy-old-mapping)
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies cluster)
- new-assignment (.getAssignmentById cluster "topology2")
- new-ed->slot (.getExecutorToSlot new-assignment)]
- ;; for each executor that was scheduled on healthy workers, their slots should remain unchanged after a new scheduling
- (doseq [ed healthy-eds]
- (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
-
- (testing "When a supervisor fails, RAS does not alter existing assignments"
- (let [existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1"
- {(ExecutorDetails. 0 0) (WorkerSlot. "id0" 0) ;; worker 0 on the failed super
- (ExecutorDetails. 1 1) (WorkerSlot. "id0" 1) ;; worker 1 on the failed super
- (ExecutorDetails. 2 2) (WorkerSlot. "id1" 1)})} ;; worker 2 on the health super
- cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1]))
- assignment (.getAssignmentById cluster "topology1")
- ed->slot (.getExecutorToSlot assignment)
- copy-old-mapping (HashMap. ed->slot)
- existing-eds (.keySet copy-old-mapping) ;; all the three eds on three workers
- new-cluster (Cluster. (nimbus/standalone-nimbus)
- (dissoc supers "id0") ;; mock the super0 as a failed supervisor
- (.getAssignments cluster)
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies new-cluster) ;; the actual schedule for this topo will not run since it is fully assigned
- new-assignment (.getAssignmentById new-cluster "topology1")
- new-ed->slot (.getExecutorToSlot new-assignment)]
- (doseq [ed existing-eds]
- (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
- (is (= "Fully Scheduled" (.get (.getStatusMap new-cluster) "topology1")))))
-
- (testing "When a supervisor and a worker on it fails, RAS does not alter existing assignments"
- (let [existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1"
- {(ExecutorDetails. 0 0) (WorkerSlot. "id0" 1) ;; the worker to orphan
- (ExecutorDetails. 1 1) (WorkerSlot. "id0" 2) ;; the worker to kill
- (ExecutorDetails. 2 2) (WorkerSlot. "id1" 1)})} ;; the healthy worker
- cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1]))
- assignment (.getAssignmentById cluster "topology1")
- ed->slot (.getExecutorToSlot assignment)
- _ (.remove ed->slot (ExecutorDetails. 1 1)) ;; delete one worker of super0 (failed) from topo1 assignment to enable actual schedule for testing
- copy-old-mapping (HashMap. ed->slot)
- existing-eds (.keySet copy-old-mapping) ;; namely the two eds on the orphaned worker and the healthy worker
- new-cluster (Cluster. (nimbus/standalone-nimbus)
- (dissoc supers "id0") ;; mock the super0 as a failed supervisor
- (.getAssignments cluster)
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies new-cluster)
- new-assignment (.getAssignmentById new-cluster "topology1")
- new-ed->slot (.getExecutorToSlot new-assignment)]
- (doseq [ed existing-eds]
- (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap new-cluster) "topology1")))))
-
- (testing "Scheduling a new topology does not disturb other assignments unnecessarily"
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1]))
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies cluster)
- assignment (.getAssignmentById cluster "topology1")
- ed->slot (.getExecutorToSlot assignment)
- copy-old-mapping (HashMap. ed->slot)
- new-topologies (Topologies. (to-top-map [topology1 topology2])) ;; a second topology joins
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler new-topologies cluster)
- new-assignment (.getAssignmentById cluster "topology1")
- new-ed->slot (.getExecutorToSlot new-assignment)]
- (doseq [ed (.keySet copy-old-mapping)]
- (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed)))) ;; the assignment for topo1 should not change
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))))
-
-;; Automated tests for heterogeneous cluster
-(deftest test-heterogeneous-cluster
- (let [supers (into {} (for [super [(SupervisorDetails. (str "id" 0) (str "host" 0) (list )
- (map int (list 1 2 3 4))
- {Config/SUPERVISOR_MEMORY_CAPACITY_MB 4096.0
- Config/SUPERVISOR_CPU_CAPACITY 800.0})
- (SupervisorDetails. (str "id" 1) (str "host" 1) (list )
- (map int (list 1 2 3 4))
- {Config/SUPERVISOR_MEMORY_CAPACITY_MB 1024.0
- Config/SUPERVISOR_CPU_CAPACITY 200.0})]]
- {(.getId super) super}))
- builder1 (TopologyBuilder.) ;; topo1 has one single huge task that can not be handled by the small-super
- _ (doto (.setSpout builder1 "spout1" (TestWordSpout.) 1)
- (.setMemoryLoad 2000.0 48.0)
- (.setCPULoad 300.0))
- storm-topology1 (.createTopology builder1)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology1
- 1
- (mk-ed-map [["spout1" 0 1]]))
- builder2 (TopologyBuilder.) ;; topo2 has 4 large tasks
- _ (doto (.setSpout builder2 "spout2" (TestWordSpout.) 4)
- (.setMemoryLoad 500.0 12.0)
- (.setCPULoad 100.0))
- storm-topology2 (.createTopology builder2)
- topology2 (TopologyDetails. "topology2"
- {TOPOLOGY-NAME "topology-name-2"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology2
- 2
- (mk-ed-map [["spout2" 0 4]]))
- builder3 (TopologyBuilder.) ;; topo3 has 4 medium tasks, launching topo 1-3 together requires the same mem as the cluster's mem capacity (5G)
- _ (doto (.setSpout builder3 "spout3" (TestWordSpout.) 4)
- (.setMemoryLoad 200.0 56.0)
- (.setCPULoad 20.0))
- storm-topology3 (.createTopology builder3)
- topology3 (TopologyDetails. "topology3"
- {TOPOLOGY-NAME "topology-name-3"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology3
- 2
- (mk-ed-map [["spout3" 0 4]]))
- builder4 (TopologyBuilder.) ;; topo4 has 12 small tasks, each's mem req does not exactly divide a node's mem capacity
- _ (doto (.setSpout builder4 "spout4" (TestWordSpout.) 2)
- (.setMemoryLoad 100.0 0.0)
- (.setCPULoad 30.0))
- storm-topology4 (.createTopology builder4)
- topology4 (TopologyDetails. "topology4"
- {TOPOLOGY-NAME "topology-name-4"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology4
- 2
- (mk-ed-map [["spout4" 0 12]]))
- builder5 (TopologyBuilder.) ;; topo5 has 40 small tasks, it should be able to exactly use up both the cpu and mem in teh cluster
- _ (doto (.setSpout builder5 "spout5" (TestWordSpout.) 40)
- (.setMemoryLoad 100.0 28.0)
- (.setCPULoad 25.0))
- storm-topology5 (.createTopology builder5)
- topology5 (TopologyDetails. "topology5"
- {TOPOLOGY-NAME "topology-name-5"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology5
- 2
- (mk-ed-map [["spout5" 0 40]]))
- epsilon 0.000001
- topologies (Topologies. (to-top-map [topology1 topology2]))]
-
- (testing "Launch topo 1-3 together, it should be able to use up either mem or cpu resource due to exact division"
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
- scheduler (ResourceAwareScheduler.)
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies cluster)
- super->mem-usage (get-super->mem-usage cluster topologies)
- super->cpu-usage (get-super->cpu-usage cluster topologies)]
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology3")))
- (doseq [super (.values supers)]
- (let [mem-avail (.getTotalMemory super)
- mem-used (.get super->mem-usage super)
- cpu-avail (.getTotalCPU super)
- cpu-used (.get super->cpu-usage super)]
- (is (or (<= (Math/abs (- mem-avail mem-used)) epsilon)
- (<= (Math/abs (- cpu-avail cpu-used)) epsilon)))))))
-
- (testing "Launch topo 1, 2 and 4, they together request a little more mem than available, so one of the 3 topos will not be scheduled"
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
- scheduler (ResourceAwareScheduler.)
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies cluster)
- scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")) 1 0)
- scheduled-topos (+ scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")) 1 0))
- scheduled-topos (+ scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology4")) 1 0))]
- (is (= scheduled-topos 2)))) ;; only 2 topos will get (fully) scheduled
-
- (testing "Launch topo5 only, both mem and cpu should be exactly used up"
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology5]))
- scheduler (ResourceAwareScheduler.)
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies cluster)
- super->mem-usage (get-super->mem-usage cluster topologies)
- super->cpu-usage (get-super->cpu-usage cluster topologies)]
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology5")))
- (doseq [super (.values supers)]
- (let [mem-avail (.getTotalMemory super)
- mem-used (.get super->mem-usage super)
- cpu-avail (.getTotalCPU ^SupervisorDetails super)
- cpu-used (.get super->cpu-usage super)]
- (is (and (<= (Math/abs (- mem-avail mem-used)) epsilon)
- (<= (Math/abs (- cpu-avail cpu-used)) epsilon)))))))))
-
-(deftest test-topology-worker-max-heap-size
- (let [supers (gen-supervisors 2 2)]
- (testing "test if RAS will spread executors across mulitple workers based on the set limit for a worker used by the topology")
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- scheduler (ResourceAwareScheduler.)
- builder1 (TopologyBuilder.)
- _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
- storm-topology1 (.createTopology builder1)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userA"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology1
- 1
- (mk-ed-map [["spout1" 0 4]]))
- topologies (Topologies. (to-top-map [topology1]))]
- (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- (.schedule scheduler topologies cluster)
- (is (= (.get (.getStatusMap cluster) "topology1") "Running - Fully Scheduled by DefaultResourceAwareStrategy"))
- (is (= (.getAssignedNumWorkers cluster topology1) 4)))
- (testing "test when no more workers are available due to topology worker max heap size limit but there is memory is still available")
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- scheduler (ResourceAwareScheduler.)
- builder1 (TopologyBuilder.)
- _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
- storm-topology1 (.createTopology builder1)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology1
- 1
- (mk-ed-map [["spout1" 0 5]]))
- topologies (Topologies. (to-top-map [topology1]))]
- (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- (.schedule scheduler topologies cluster)
- ;;spout1 is going to contain 5 executors that needs scheduling. Each of those executors has a memory requirement of 128.0 MB
- ;;The cluster contains 4 free WorkerSlots. For this topolology each worker is limited to a max heap size of 128.0
- ;;Thus, one executor not going to be able to get scheduled thus failing the scheduling of this topology and no executors of this topology will be scheduleded
- (is (= (.size (.getUnassignedExecutors cluster topology1)) 5))
- (is (= (.get (.getStatusMap cluster) "topology1") "Not enough resources to schedule - 0/5 executors scheduled")))
-
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- cluster (LocalCluster.)
- builder1 (TopologyBuilder.)
- _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
- storm-topology1 (.createTopology builder1)
- conf {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 129.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- topology1 (TopologyDetails. "topology1"
- conf
- storm-topology1
- 1
- (mk-ed-map [["spout1" 0 5]]))
- topologies (Topologies. (to-top-map [topology1]))]
- (is (thrown? IllegalArgumentException
- (StormSubmitter/submitTopologyWithProgressBar "test" conf storm-topology1)))
-
- )))