You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:37 UTC
[09/23] storm git commit: first initial implementation
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
index 4fff405..3f2610c 100644
--- a/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
+++ b/storm-core/test/clj/backtype/storm/scheduler/resource_aware_scheduler_test.clj
@@ -49,6 +49,9 @@
(into {}
(for [at (range start end)]
{(ed at) name})))))
+(def DEFAULT_PRIORITY_STRATEGY "backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy")
+(def DEFAULT_EVICTION_STRATEGY "backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy")
+(def DEFAULT_SCHEDULING_STRATEGY "backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy")
;; get the super->mem HashMap by counting the eds' mem usage of all topos on each super
(defn get-super->mem-usage [^Cluster cluster ^Topologies topologies]
@@ -154,7 +157,8 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology
1
(mk-ed-map [["wordSpout" 0 1]
@@ -165,7 +169,8 @@
topologies (Topologies. (to-top-map [topology1]))
node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
scheduler (ResourceAwareScheduler.)]
- (.prepare scheduler {})
+ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
(.schedule scheduler topologies cluster)
(let [assignment (.getAssignmentById cluster "topology1")
assigned-slots (.getSlots assignment)
@@ -173,7 +178,7 @@
(is (= 1 (.size assigned-slots)))
(is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
(is (= 2 (.size executors))))
- (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))))
+ (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))))
(deftest test-topology-with-multiple-spouts
(let [builder1 (TopologyBuilder.) ;; a topology with multiple spouts
@@ -195,7 +200,8 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology1
1
(mk-ed-map [["wordSpout1" 0 1]
@@ -216,7 +222,8 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology2
1
(mk-ed-map [["wordSpoutX" 0 1]
@@ -227,7 +234,8 @@
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology1 topology2]))
scheduler (ResourceAwareScheduler.)]
- (.prepare scheduler {})
+ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
(.schedule scheduler topologies cluster)
(let [assignment (.getAssignmentById cluster "topology1")
assigned-slots (.getSlots assignment)
@@ -235,14 +243,14 @@
(is (= 1 (.size assigned-slots)))
(is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
(is (= 7 (.size executors))))
- (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
+ (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
(let [assignment (.getAssignmentById cluster "topology2")
assigned-slots (.getSlots assignment)
executors (.getExecutors assignment)]
(is (= 1 (.size assigned-slots)))
(is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
(is (= 2 (.size executors))))
- (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))))
+ (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
(deftest test-topology-set-memory-and-cpu-load
(let [builder (TopologyBuilder.)
@@ -261,7 +269,8 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology
2
(mk-ed-map [["wordSpout" 0 1]
@@ -271,7 +280,8 @@
"backtype.storm.testing.AlternateRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology2]))
scheduler (ResourceAwareScheduler.)]
- (.prepare scheduler {})
+ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
(.schedule scheduler topologies cluster)
(let [assignment (.getAssignmentById cluster "topology2")
assigned-slots (.getSlots assignment)
@@ -280,7 +290,7 @@
(is (= 1 (.size assigned-slots)))
(is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
(is (= 2 (.size executors))))
- (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))))
+ (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
(deftest test-resource-limitation
(let [builder (TopologyBuilder.)
@@ -300,7 +310,8 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology
2 ;; need two workers, each on one node
(mk-ed-map [["wordSpout" 0 2]
@@ -310,7 +321,8 @@
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology1]))
scheduler (ResourceAwareScheduler.)]
- (.prepare scheduler {})
+ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
(.schedule scheduler topologies cluster)
(let [assignment (.getAssignmentById cluster "topology1")
assigned-slots (.getSlots assignment)
@@ -344,7 +356,7 @@
(is (>= avail used)))
(doseq [[avail used] cpu-avail->used] ;; for each node, assigned cpu smaller than total
(is (>= avail used))))
- (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))))
+ (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))))
(deftest test-scheduling-resilience
(let [supers (gen-supervisors 2 2)
@@ -358,7 +370,8 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology1
3 ;; three workers to hold three executors
(mk-ed-map [["spout1" 0 3]]))
@@ -372,7 +385,8 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology2
2 ;; two workers, each holds one executor and resides on one node
(mk-ed-map [["spout2" 0 2]]))
@@ -383,7 +397,8 @@
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology2]))
- _ (.prepare scheduler {})
+ _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
_ (.schedule scheduler topologies cluster)
assignment (.getAssignmentById cluster "topology2")
failed-worker (first (vec (.getSlots assignment))) ;; choose a worker to mock as failed
@@ -392,14 +407,15 @@
_ (doseq [ed failed-eds] (.remove ed->slot ed)) ;; remove executor details assigned to the worker
copy-old-mapping (HashMap. ed->slot)
healthy-eds (.keySet copy-old-mapping)
- _ (.prepare scheduler {})
+ _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
_ (.schedule scheduler topologies cluster)
new-assignment (.getAssignmentById cluster "topology2")
new-ed->slot (.getExecutorToSlot new-assignment)]
;; for each executor that was scheduled on healthy workers, their slots should remain unchanged after a new scheduling
(doseq [ed healthy-eds]
(is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
- (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))))
+ (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
(testing "When a supervisor fails, RAS does not alter existing assignments"
(let [existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1"
@@ -419,7 +435,8 @@
(.getAssignments cluster)
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- _ (.prepare scheduler {})
+ _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
_ (.schedule scheduler topologies new-cluster) ;; the actual schedule for this topo will not run since it is fully assigned
new-assignment (.getAssignmentById new-cluster "topology1")
new-ed->slot (.getExecutorToSlot new-assignment)]
@@ -446,33 +463,36 @@
(.getAssignments cluster)
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- _ (.prepare scheduler {})
+ _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
_ (.schedule scheduler topologies new-cluster)
new-assignment (.getAssignmentById new-cluster "topology1")
new-ed->slot (.getExecutorToSlot new-assignment)]
(doseq [ed existing-eds]
(is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
- (is (= "Fully Scheduled" (.get (.getStatusMap new-cluster) "topology1")))))
+ (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap new-cluster) "topology1")))))
(testing "Scheduling a new topology does not disturb other assignments unnecessarily"
(let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology1]))
- _ (.prepare scheduler {})
+ _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
_ (.schedule scheduler topologies cluster)
assignment (.getAssignmentById cluster "topology1")
ed->slot (.getExecutorToSlot assignment)
copy-old-mapping (HashMap. ed->slot)
new-topologies (Topologies. (to-top-map [topology1 topology2])) ;; a second topology joins
- _ (.prepare scheduler {})
+ _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
_ (.schedule scheduler new-topologies cluster)
new-assignment (.getAssignmentById cluster "topology1")
new-ed->slot (.getExecutorToSlot new-assignment)]
(doseq [ed (.keySet copy-old-mapping)]
(is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed)))) ;; the assignment for topo1 should not change
- (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
- (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))))))
+ (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
+ (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))))
;; Automated tests for heterogeneous cluster
(deftest test-heterogeneous-cluster
@@ -497,7 +517,8 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology1
1
(mk-ed-map [["spout1" 0 1]]))
@@ -513,7 +534,8 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology2
2
(mk-ed-map [["spout2" 0 4]]))
@@ -529,7 +551,8 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology3
2
(mk-ed-map [["spout3" 0 4]]))
@@ -545,7 +568,8 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology4
2
(mk-ed-map [["spout4" 0 12]]))
@@ -561,7 +585,8 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology5
2
(mk-ed-map [["spout5" 0 40]]))
@@ -574,13 +599,14 @@
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
scheduler (ResourceAwareScheduler.)
- _ (.prepare scheduler {})
+ _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
_ (.schedule scheduler topologies cluster)
super->mem-usage (get-super->mem-usage cluster topologies)
super->cpu-usage (get-super->cpu-usage cluster topologies)]
- (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")))
- (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")))
- (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology3")))
+ (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
+ (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))
+ (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology3")))
(doseq [super (.values supers)]
(let [mem-avail (.getTotalMemory super)
mem-used (.get super->mem-usage super)
@@ -595,11 +621,12 @@
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
scheduler (ResourceAwareScheduler.)
- _ (.prepare scheduler {})
+ _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
_ (.schedule scheduler topologies cluster)
- scheduled-topos (if (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")) 1 0)
- scheduled-topos (+ scheduled-topos (if (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")) 1 0))
- scheduled-topos (+ scheduled-topos (if (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology4")) 1 0))]
+ scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")) 1 0)
+ scheduled-topos (+ scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")) 1 0))
+ scheduled-topos (+ scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology4")) 1 0))]
(is (= scheduled-topos 2)))) ;; only 2 topos will get (fully) scheduled
(testing "Launch topo5 only, both mem and cpu should be exactly used up"
@@ -608,11 +635,12 @@
"backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"})
topologies (Topologies. (to-top-map [topology5]))
scheduler (ResourceAwareScheduler.)
- _ (.prepare scheduler {})
+ _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
_ (.schedule scheduler topologies cluster)
super->mem-usage (get-super->mem-usage cluster topologies)
super->cpu-usage (get-super->cpu-usage cluster topologies)]
- (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology5")))
+ (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology5")))
(doseq [super (.values supers)]
(let [mem-avail (.getTotalMemory super)
mem-used (.get super->mem-usage super)
@@ -638,14 +666,16 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology1
1
(mk-ed-map [["spout1" 0 4]]))
topologies (Topologies. (to-top-map [topology1]))]
- (.prepare scheduler {"userA" {"cpu" 2000.0 "memory" 400.0}})
+ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
(.schedule scheduler topologies cluster)
- (is (= (.get (.getStatusMap cluster) "topology1") "Fully Scheduled"))
+ (is (= (.get (.getStatusMap cluster) "topology1") "Running - Fully Scheduled by DefaultResourceAwareStrategy"))
(is (= (.getAssignedNumWorkers cluster topology1) 4)))
(testing "test when no more workers are available due to topology worker max heap size limit but there is memory is still available")
(let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
@@ -662,18 +692,20 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
storm-topology1
1
(mk-ed-map [["spout1" 0 5]]))
topologies (Topologies. (to-top-map [topology1]))]
- (.prepare scheduler {})
+ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
+ RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
(.schedule scheduler topologies cluster)
;;spout1 is going to contain 5 executors that needs scheduling. Each of those executors has a memory requirement of 128.0 MB
;;The cluster contains 4 free WorkerSlots. For this topolology each worker is limited to a max heap size of 128.0
;;Thus, one executor not going to be able to get scheduled thus failing the scheduling of this topology and no executors of this topology will be scheduleded
(is (= (.size (.getUnassignedExecutors cluster topology1)) 5))
- (is (= (.get (.getStatusMap cluster) "topology1") "Not all executors successfully scheduled: [[1, 1]]")))
+ (is (= (.get (.getStatusMap cluster) "topology1") "Not enough resources to schedule - 0/5 executors scheduled")))
(let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
{STORM-NETWORK-TOPOGRAPHY-PLUGIN
@@ -688,7 +720,8 @@
TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
- TOPOLOGY-PRIORITY 0}
+ TOPOLOGY-PRIORITY 0
+ TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
topology1 (TopologyDetails. "topology1"
conf
storm-topology1
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
deleted file mode 100644
index 3ff0af1..0000000
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/Experiment.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.scheduler.resource;
-
-import backtype.storm.Config;
-import backtype.storm.scheduler.Cluster;
-import backtype.storm.scheduler.INimbus;
-import backtype.storm.scheduler.SchedulerAssignmentImpl;
-import backtype.storm.scheduler.SupervisorDetails;
-import backtype.storm.scheduler.Topologies;
-import backtype.storm.scheduler.TopologyDetails;
-import backtype.storm.utils.Time;
-import backtype.storm.utils.Utils;
-import org.junit.Test;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Created by jerrypeng on 11/11/15.
- */
-public class Experiment {
-
- private static final Logger LOG = LoggerFactory.getLogger(Experiment.class);
-
- /**
- * Eviction order:
- * topo-3: since user bobby don't have any resource guarantees and topo-3 is the lowest priority for user bobby
- * topo-2: since user bobby don't have any resource guarantees and topo-2 is the next lowest priority for user bobby
- * topo-5: since user derek has exceeded his resource guarantee while user jerry has not. topo-5 and topo-4 has the same priority
- * but topo-4 was submitted earlier thus we choose that one to evict
- */
- @Test
- public void TestEvictMultipleTopologiesFromMultipleUsersInCorrectOrder() {
- INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
- Map<String, Number> resourceMap = new HashMap<String, Number>();
- resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
- resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
- Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
- Config config = new Config();
- config.putAll(Utils.readDefaultConfig());
- config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
- config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
- config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
- Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
- resourceUserPool.put("jerry", new HashMap<String, Number>());
- resourceUserPool.get("jerry").put("cpu", 300.0);
- resourceUserPool.get("jerry").put("memory", 3000.0);
-
- resourceUserPool.put("derek", new HashMap<String, Number>());
- resourceUserPool.get("derek").put("cpu", 100.0);
- resourceUserPool.get("derek").put("memory", 1000.0);
-
- config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
- Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
-
- config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
-
- TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
- TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
- TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
-
- config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
-
- TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
- TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
-
- config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
-
- TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
- TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 15, 30);
-
- Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
- topoMap.put(topo2.getId(), topo2);
- topoMap.put(topo3.getId(), topo3);
- topoMap.put(topo4.getId(), topo4);
- topoMap.put(topo5.getId(), topo5);
-
- Topologies topologies = new Topologies(topoMap);
-
- ResourceAwareScheduler rs = new ResourceAwareScheduler();
-
- rs.prepare(config);
- rs.schedule(topologies, cluster);
-
- for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
- Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
- }
- Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
- Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
- Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
- Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
-
- for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
- Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
- }
- Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
- Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
- Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
- Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
-
- //user jerry submits another topology
- topoMap.put(topo1.getId(), topo1);
- topologies = new Topologies(topoMap);
- rs.schedule(topologies, cluster);
-
- for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
- Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
- }
- Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
- Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
- Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
- Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
-
- for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
- Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
- }
- Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
- Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
- Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
- Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
-
- for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
- Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
- }
- for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
- Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
- }
- Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size());
- Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
- Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
- Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
- Assert.assertEquals("correct topology to evict", rs.getUser("bobby").getTopologiesAttempted().iterator().next().getName(), "topo-3");
-
- topoMap.put(topo6.getId(), topo6);
- topologies = new Topologies(topoMap);
- rs.schedule(topologies, cluster);
-
- for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
- Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
- }
- Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
- Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
- Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
- Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
-
- for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
- Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
- }
- Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
- Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
- Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
- Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
-
- for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
- Assert.assertFalse(TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
- }
- Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
- Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
- Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
- Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
-
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
-
- topoMap.put(topo7.getId(), topo7);
- topologies = new Topologies(topoMap);
- rs.schedule(topologies, cluster);
-
- for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
- Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
- }
- Assert.assertEquals("# of running topologies", 3, rs.getUser("jerry").getTopologiesRunning().size());
- Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
- Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
- Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
-
- for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
- Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
- }
- for (TopologyDetails topo : rs.getUser("derek").getTopologiesAttempted()) {
- Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
- }
- Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
- Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
- Assert.assertEquals("# of attempted topologies", 1, rs.getUser("derek").getTopologiesAttempted().size());
- Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
- Assert.assertEquals("correct topology to evict", rs.getUser("derek").getTopologiesAttempted().iterator().next().getName(), "topo-4");
-
- for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
- Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
- }
- Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
- Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
- Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
- Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
-
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
- Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
index b41b039..ffdf460 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -20,8 +20,10 @@ package backtype.storm.scheduler.resource;
import backtype.storm.Config;
import backtype.storm.scheduler.Cluster;
+import backtype.storm.scheduler.ExecutorDetails;
import backtype.storm.scheduler.INimbus;
import backtype.storm.scheduler.IScheduler;
+import backtype.storm.scheduler.SchedulerAssignment;
import backtype.storm.scheduler.SchedulerAssignmentImpl;
import backtype.storm.scheduler.SupervisorDetails;
import backtype.storm.scheduler.Topologies;
@@ -66,6 +68,9 @@ public class TestResourceAwareScheduler {
Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(20, 4, resourceMap);
Config config = new Config();
config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
config.put(Config.TOPOLOGY_SUBMITTER_USER, TOPOLOGY_SUBMITTER);
@@ -175,6 +180,9 @@ public class TestResourceAwareScheduler {
Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(20, 4, resourceMap);
Config config = new Config();
config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
resourceUserPool.put("jerry", new HashMap<String, Number>());
resourceUserPool.get("jerry").put("cpu", 1000);
@@ -240,7 +248,7 @@ public class TestResourceAwareScheduler {
rs.schedule(topologies, cluster);
for (TopologyDetails topo : topoMap.values()) {
- Assert.assertEquals(cluster.getStatusMap().get(topo.getId()), "Fully Scheduled");
+ Assert.assertTrue(TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
for (User user : rs.getUserMap().values()) {
@@ -258,6 +266,9 @@ public class TestResourceAwareScheduler {
Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, resourceMap);
Config config = new Config();
config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
resourceUserPool.put("jerry", new HashMap<String, Number>());
resourceUserPool.get("jerry").put("cpu", 1000);
@@ -316,6 +327,9 @@ public class TestResourceAwareScheduler {
Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
Config config = new Config();
config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
@@ -432,6 +446,9 @@ public class TestResourceAwareScheduler {
Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
Config config = new Config();
config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
@@ -539,6 +556,9 @@ public class TestResourceAwareScheduler {
Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
Config config = new Config();
config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
@@ -653,7 +673,7 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
- Assert.assertFalse(TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
}
Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
@@ -698,4 +718,459 @@ public class TestResourceAwareScheduler {
Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
}
+
+ /**
+ * If topologies from other users cannot be evicted to make space
+ * check if there is a topology with lower priority that can be evicted from the current user
+ */
+ @Test
+ public void TestEvictTopologyFromItself() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+ Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+ resourceUserPool.put("jerry", new HashMap<String, Number>());
+ resourceUserPool.get("jerry").put("cpu", 200.0);
+ resourceUserPool.get("jerry").put("memory", 2000.0);
+
+ resourceUserPool.put("bobby", new HashMap<String, Number>());
+ resourceUserPool.get("bobby").put("cpu", 100.0);
+ resourceUserPool.get("bobby").put("memory", 1000.0);
+
+
+ resourceUserPool.put("derek", new HashMap<String, Number>());
+ resourceUserPool.get("derek").put("cpu", 100.0);
+ resourceUserPool.get("derek").put("memory", 1000.0);
+
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo1.getId(), topo1);
+ topoMap.put(topo2.getId(), topo2);
+ topoMap.put(topo5.getId(), topo5);
+ topoMap.put(topo6.getId(), topo6);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+
+ //user jerry submits another topology into a full cluster
+ // topo3 should not be able to scheduled
+ topoMap.put(topo3.getId(), topo3);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 1, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+ //make sure that topo-3 didn't get scheduled.
+ Assert.assertEquals("correct topology in attempted queue", rs.getUser("jerry").getTopologiesAttempted().iterator().next().getName(), "topo-3");
+
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+
+ //user jerry submits another topology but this one should be scheduled since it has higher priority than than the
+ //rest of jerry's running topologies
+ topoMap.put(topo4.getId(), topo4);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 2, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+ Assert.assertTrue("correct topology in attempted queue", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("jerry").getTopologiesAttempted()) != null);
+ //Either topo-1 or topo-2 should have gotten evicted
+ Assert.assertTrue("correct topology in attempted queue", ((TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-1", rs.getUser("jerry").getTopologiesAttempted())) != null)
+ || (TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("jerry").getTopologiesAttempted()) != null));
+ //assert that topo-4 got scheduled
+ Assert.assertTrue("correct topology in running queue", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-4", rs.getUser("jerry").getTopologiesRunning()) != null);
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ }
+
+ /**
+ * If topologies from other users cannot be evicted to make space
+ * check if there is a topology with lower priority that can be evicted from the current user
+ */
+ @Test
+ public void TestOverGuaranteeEviction() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+ Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+ resourceUserPool.put("jerry", new HashMap<String, Number>());
+ resourceUserPool.get("jerry").put("cpu", 70.0);
+ resourceUserPool.get("jerry").put("memory", 700.0);
+
+ resourceUserPool.put("bobby", new HashMap<String, Number>());
+ resourceUserPool.get("bobby").put("cpu", 100.0);
+ resourceUserPool.get("bobby").put("memory", 1000.0);
+
+
+ resourceUserPool.put("derek", new HashMap<String, Number>());
+ resourceUserPool.get("derek").put("cpu", 25.0);
+ resourceUserPool.get("derek").put("memory", 250.0);
+
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo1.getId(), topo1);
+ topoMap.put(topo3.getId(), topo3);
+ topoMap.put(topo4.getId(), topo4);
+ topoMap.put(topo5.getId(), topo5);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+
+ //user derek submits another topology into a full cluster
+ // topo6 should not be able to scheduled
+ topoMap.put(topo6.getId(), topo6);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 1, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+ //topo5 will be evicted since topo6 has higher priority
+ Assert.assertEquals("correct topology in attempted queue", "topo-5", rs.getUser("derek").getTopologiesAttempted().iterator().next().getName());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+
+ //user jerry submits topo2
+ topoMap.put(topo2.getId(), topo2);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 0, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 2, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+ Assert.assertEquals("correct topology in attempted queue", "topo-6", rs.getUser("derek").getTopologiesAttempted().iterator().next().getName());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ }
+
+ /**
+ * Test correct behaviour when a supervisor dies. Check if the scheduler handles it correctly and evicts the correct
+ * topology when rescheduling the executors from the died supervisor
+ */
+ @Test
+ public void TestFaultTolerance() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(6, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, backtype.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, backtype.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+ Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+ resourceUserPool.put("jerry", new HashMap<String, Number>());
+ resourceUserPool.get("jerry").put("cpu", 50.0);
+ resourceUserPool.get("jerry").put("memory", 500.0);
+
+ resourceUserPool.put("bobby", new HashMap<String, Number>());
+ resourceUserPool.get("bobby").put("cpu", 200.0);
+ resourceUserPool.get("bobby").put("memory", 2000.0);
+
+
+ resourceUserPool.put("derek", new HashMap<String, Number>());
+ resourceUserPool.get("derek").put("cpu", 100.0);
+ resourceUserPool.get("derek").put("memory", 1000.0);
+
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo1.getId(), topo1);
+ topoMap.put(topo2.getId(), topo2);
+ topoMap.put(topo3.getId(), topo3);
+ topoMap.put(topo4.getId(), topo4);
+ topoMap.put(topo5.getId(), topo5);
+ topoMap.put(topo6.getId(), topo6);
+
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+
+ //fail supervisor
+ SupervisorDetails supFailed = cluster.getSupervisors().values().iterator().next();
+ LOG.info("/***** failing supervisor: {} ****/", supFailed.getHost());
+ supMap.remove(supFailed.getId());
+ Map<String, SchedulerAssignmentImpl> newAssignments = new HashMap<String, SchedulerAssignmentImpl>();
+ for (Map.Entry<String, SchedulerAssignment> topoToAssignment : cluster.getAssignments().entrySet()) {
+ String topoId = topoToAssignment.getKey();
+ SchedulerAssignment assignment = topoToAssignment.getValue();
+ Map<ExecutorDetails, WorkerSlot> executorToSlots = new HashMap<ExecutorDetails, WorkerSlot>();
+ for (Map.Entry<ExecutorDetails, WorkerSlot> execToWorker : assignment.getExecutorToSlot().entrySet()) {
+ ExecutorDetails exec = execToWorker.getKey();
+ WorkerSlot ws = execToWorker.getValue();
+ if (!ws.getNodeId().equals(supFailed.getId())) {
+ executorToSlots.put(exec, ws);
+ }
+ }
+ newAssignments.put(topoId, new SchedulerAssignmentImpl(topoId, executorToSlots));
+ }
+ Map<String, String> statusMap = cluster.getStatusMap();
+ cluster = new Cluster(iNimbus, supMap, newAssignments, config);
+ cluster.setStatusMap(statusMap);
+
+ rs.schedule(topologies, cluster);
+
+ //Supervisor failed contains a executor from topo-6 of user derek. Should evict a topology from user jerry since user will be above resource guarantee more so than user derek
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 1, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
index e9bf039..24ff980 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUser.java
@@ -104,8 +104,8 @@ public class TestUser {
Assert.assertEquals("check cpu resource guarantee", cpuGuarantee, user1.getCPUResourceGuaranteed(), 0.001);
Assert.assertEquals("check memory resource guarantee", memoryGuarantee, user1.getMemoryResourceGuaranteed(), 0.001);
- Assert.assertEquals("check cpu resource pool utilization", ((100.0 * 3.0) / cpuGuarantee), user1.getCPUResourcePoolUtilization().doubleValue(), 0.001);
- Assert.assertEquals("check memory resource pool utilization", ((200.0 + 200.0) * 3.0) / memoryGuarantee, user1.getMemoryResourcePoolUtilization().doubleValue(), 0.001);
+ Assert.assertEquals("check cpu resource pool utilization", ((100.0 * 3.0) / cpuGuarantee), user1.getCPUResourcePoolUtilization(), 0.001);
+ Assert.assertEquals("check memory resource pool utilization", ((200.0 + 200.0) * 3.0) / memoryGuarantee, user1.getMemoryResourcePoolUtilization(), 0.001);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/3e832205/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 7721300..2098f0c 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -97,31 +97,31 @@ public class TestUtilsForResourceAwareScheduler {
public static Map<String, SupervisorDetails> genSupervisors(int numSup, int numPorts, Map resourceMap) {
Map<String, SupervisorDetails> retList = new HashMap<String, SupervisorDetails>();
- for(int i=0; i<numSup; i++) {
+ for (int i = 0; i < numSup; i++) {
List<Number> ports = new LinkedList<Number>();
- for(int j = 0; j<numPorts; j++) {
+ for (int j = 0; j < numPorts; j++) {
ports.add(j);
}
- SupervisorDetails sup = new SupervisorDetails("sup-"+i, "host-"+i, null, ports, resourceMap);
+ SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, resourceMap);
retList.put(sup.getId(), sup);
}
return retList;
}
public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology, int spoutParallelism, int boltParallelism) {
- Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String> ();
- int startTask=0;
- int endTask=1;
- for(Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
- for(int i=0; i<spoutParallelism; i++) {
+ Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String>();
+ int startTask = 0;
+ int endTask = 1;
+ for (Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+ for (int i = 0; i < spoutParallelism; i++) {
retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
startTask++;
endTask++;
}
}
- for(Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
- for(int i=0; i<boltParallelism; i++) {
+ for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
+ for (int i = 0; i < boltParallelism; i++) {
retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
startTask++;
endTask++;
@@ -131,14 +131,14 @@ public class TestUtilsForResourceAwareScheduler {
}
public static TopologyDetails getTopology(String name, Map config, int numSpout, int numBolt,
- int spoutParallelism, int boltParallelism, int launchTime, int priority) {
+ int spoutParallelism, int boltParallelism, int launchTime, int priority) {
Config conf = new Config();
conf.putAll(config);
conf.put(Config.TOPOLOGY_PRIORITY, priority);
conf.put(Config.TOPOLOGY_NAME, name);
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
- StormTopology topology = buildTopology(numSpout,numBolt, spoutParallelism, boltParallelism);
+ StormTopology topology = buildTopology(numSpout, numBolt, spoutParallelism, boltParallelism);
TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
0,
genExecsAndComps(topology, spoutParallelism, boltParallelism), launchTime);
@@ -258,7 +258,7 @@ public class TestUtilsForResourceAwareScheduler {
@Override
public String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId) {
- if(existingSupervisors.containsKey(nodeId)) {
+ if (existingSupervisors.containsKey(nodeId)) {
return existingSupervisors.get(nodeId).getHost();
}
return null;
@@ -270,10 +270,10 @@ public class TestUtilsForResourceAwareScheduler {
}
}
- private static boolean isContain(String source, String subItem){
- String pattern = "\\b"+subItem+"\\b";
- Pattern p=Pattern.compile(pattern, Pattern.CASE_INSENSITIVE);
- Matcher m=p.matcher(source);
+ private static boolean isContain(String source, String subItem) {
+ String pattern = "\\b" + subItem + "\\b";
+ Pattern p = Pattern.compile(pattern, Pattern.CASE_INSENSITIVE);
+ Matcher m = p.matcher(source);
return m.find();
}
@@ -283,8 +283,8 @@ public class TestUtilsForResourceAwareScheduler {
public static TopologyDetails findTopologyInSetFromName(String topoName, Set<TopologyDetails> set) {
TopologyDetails ret = null;
- for(TopologyDetails entry : set) {
- if(entry.getName().equals(topoName)) {
+ for (TopologyDetails entry : set) {
+ if (entry.getName().equals(topoName)) {
ret = entry;
}
}