You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/03/28 21:07:21 UTC
[1/8] storm git commit: [STORM-1300] port
backtype.storm.scheduler.resource-aware-scheduler-test to java.
Repository: storm
Updated Branches:
refs/heads/master 89a349eb8 -> da7969ea9
[STORM-1300] port backtype.storm.scheduler.resource-aware-scheduler-test to java.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1b93de1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1b93de1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1b93de1
Branch: refs/heads/master
Commit: c1b93de1650d113df0e1d0493780d9915bf3dacc
Parents: c2cf3be
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Mar 18 16:06:00 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Mar 18 16:06:00 2016 -0500
----------------------------------------------------------------------
.../resource/TestResourceAwareScheduler.java | 683 ++++++++++++++++++-
.../TestUtilsForResourceAwareScheduler.java | 73 +-
2 files changed, 754 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c1b93de1/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 78c73a1..e0336ea 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -19,6 +19,7 @@
package org.apache.storm.scheduler.resource;
import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
@@ -29,11 +30,14 @@ import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import org.apache.storm.validation.ConfigValidation;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +48,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Collection;
+import java.util.Collections;
public class TestResourceAwareScheduler {
@@ -54,6 +61,680 @@ public class TestResourceAwareScheduler {
private static int currentTime = 1450418597;
+ private static final Config defaultTopologyConf = new Config();
+
+
+ @BeforeClass
+ public static void initConf() {
+ defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
+ defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+
+ defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
+ defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0);
+ defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0);
+ defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0);
+ defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
+ defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
+ }
+
+ @Test
+ public void testRASNodeSlotAssign() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
+ Topologies topologies = new Topologies(new HashMap<String, TopologyDetails>());
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), new HashMap());
+ Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
+ Assert.assertEquals(5, nodes.size());
+ RAS_Node node = nodes.get("sup-0");
+
+ Assert.assertEquals("sup-0", node.getId());
+ Assert.assertTrue(node.isAlive());
+ Assert.assertEquals(0, node.getRunningTopologies().size());
+ Assert.assertTrue(node.isTotallyFree());
+ Assert.assertEquals(4, node.totalSlotsFree());
+ Assert.assertEquals(0, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0);
+
+ List<ExecutorDetails> executors11 = new ArrayList<>();
+ executors11.add(new ExecutorDetails(1, 1));
+ node.assign(node.getFreeSlots().iterator().next(), topology1, executors11);
+ Assert.assertEquals(1, node.getRunningTopologies().size());
+ Assert.assertFalse(node.isTotallyFree());
+ Assert.assertEquals(3, node.totalSlotsFree());
+ Assert.assertEquals(1, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ List<ExecutorDetails> executors12 = new ArrayList<>();
+ executors12.add(new ExecutorDetails(2, 2));
+ node.assign(node.getFreeSlots().iterator().next(), topology1, executors12);
+ Assert.assertEquals(1, node.getRunningTopologies().size());
+ Assert.assertFalse(node.isTotallyFree());
+ Assert.assertEquals(2, node.totalSlotsFree());
+ Assert.assertEquals(2, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0);
+
+ List<ExecutorDetails> executors21 = new ArrayList<>();
+ executors21.add(new ExecutorDetails(1, 1));
+ node.assign(node.getFreeSlots().iterator().next(), topology2, executors21);
+ Assert.assertEquals(2, node.getRunningTopologies().size());
+ Assert.assertFalse(node.isTotallyFree());
+ Assert.assertEquals(1, node.totalSlotsFree());
+ Assert.assertEquals(3, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ List<ExecutorDetails> executors22 = new ArrayList<>();
+ executors22.add(new ExecutorDetails(2, 2));
+ node.assign(node.getFreeSlots().iterator().next(), topology2, executors22);
+ Assert.assertEquals(2, node.getRunningTopologies().size());
+ Assert.assertFalse(node.isTotallyFree());
+ Assert.assertEquals(0, node.totalSlotsFree());
+ Assert.assertEquals(4, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+
+ node.freeAllSlots();
+ Assert.assertEquals(0, node.getRunningTopologies().size());
+ Assert.assertTrue(node.isTotallyFree());
+ Assert.assertEquals(4, node.totalSlotsFree());
+ Assert.assertEquals(0, node.totalSlotsUsed());
+ Assert.assertEquals(4, node.totalSlots());
+ }
+
+ @Test
+ public void sanityTestOfScheduling() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, resourceMap);
+
+ Config config = new Config();
+ config.putAll(defaultTopologyConf);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 0, 0);
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ Topologies topologies = new Topologies(topoMap);
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ SchedulerAssignment assignment = cluster.getAssignmentById(topology1.getId());
+ Set<WorkerSlot> assignedSlots = assignment.getSlots();
+ Set<String> nodesIDs = new HashSet<>();
+ for (WorkerSlot slot : assignedSlots) {
+ nodesIDs.add(slot.getNodeId());
+ }
+ Collection<ExecutorDetails> executors = assignment.getExecutors();
+
+ Assert.assertEquals(1, assignedSlots.size());
+ Assert.assertEquals(1, nodesIDs.size());
+ Assert.assertEquals(2, executors.size());
+ Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+ }
+
+ @Test
+ public void testTopologyWithMultipleSpouts() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, resourceMap);
+
+ TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
+ builder1.setSpout("wordSpout1", new TestWordSpout(), 1);
+ builder1.setSpout("wordSpout2", new TestWordSpout(), 1);
+ builder1.setBolt("wordCountBolt1", new TestWordCounter(), 1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
+ builder1.setBolt("wordCountBolt2", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
+ builder1.setBolt("wordCountBolt3", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
+ builder1.setBolt("wordCountBolt4", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt2");
+ builder1.setBolt("wordCountBolt5", new TestWordCounter(), 1).shuffleGrouping("wordSpout2");
+ StormTopology stormTopology1 = builder1.createTopology();
+
+ Config config = new Config();
+ config.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
+ TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0);
+
+ TopologyBuilder builder2 = new TopologyBuilder(); // a topology with two unconnected partitions
+ builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
+ builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
+ StormTopology stormTopology2 = builder2.createTopology();
+ Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 1, 0);
+ TopologyDetails topology2 = new TopologyDetails("topology2", config, stormTopology2, 0, executorMap2, 0);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ topoMap.put(topology2.getId(), topology2);
+ Topologies topologies = new Topologies(topoMap);
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
+ Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
+ Set<String> nodesIDs1 = new HashSet<>();
+ for (WorkerSlot slot : assignedSlots1) {
+ nodesIDs1.add(slot.getNodeId());
+ }
+ Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
+
+ Assert.assertEquals(1, assignedSlots1.size());
+ Assert.assertEquals(1, nodesIDs1.size());
+ Assert.assertEquals(7, executors1.size());
+ Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+
+ SchedulerAssignment assignment2 = cluster.getAssignmentById(topology2.getId());
+ Set<WorkerSlot> assignedSlots2 = assignment2.getSlots();
+ Set<String> nodesIDs2 = new HashSet<>();
+ for (WorkerSlot slot : assignedSlots2) {
+ nodesIDs2.add(slot.getNodeId());
+ }
+ Collection<ExecutorDetails> executors2 = assignment2.getExecutors();
+
+ Assert.assertEquals(1, assignedSlots2.size());
+ Assert.assertEquals(1, nodesIDs2.size());
+ Assert.assertEquals(2, executors2.size());
+ Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
+ }
+
+ @Test
+ public void testTopologySetCpuAndMemLoad() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+ TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
+ builder1.setSpout("wordSpout", new TestWordSpout(), 1).setCPULoad(20.0).setMemoryLoad(200.0);
+ builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(20.0).setMemoryLoad(200.0);
+ StormTopology stormTopology1 = builder1.createTopology();
+
+ Config config = new Config();
+ config.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
+ TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ Topologies topologies = new Topologies(topoMap);
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
+ Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
+ double assignedMemory = 0.0;
+ double assignedCpu = 0.0;
+ Set<String> nodesIDs1 = new HashSet<>();
+ for (WorkerSlot slot : assignedSlots1) {
+ nodesIDs1.add(slot.getNodeId());
+ assignedMemory += slot.getAllocatedMemOnHeap() + slot.getAllocatedMemOffHeap();
+ assignedCpu += slot.getAllocatedCpu();
+
+ }
+ Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
+
+ Assert.assertEquals(1, assignedSlots1.size());
+ Assert.assertEquals(1, nodesIDs1.size());
+ Assert.assertEquals(2, executors1.size());
+ Assert.assertEquals(400.0, assignedMemory, 0.001);
+ Assert.assertEquals(40.0, assignedCpu, 0.001);
+ Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+ }
+
+ @Test
+ public void testResourceLimitation() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+ TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
+ builder1.setSpout("wordSpout", new TestWordSpout(), 2).setCPULoad(250.0).setMemoryLoad(1000.0, 200.0);
+ builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(100.0).setMemoryLoad(500.0, 100.0);
+ StormTopology stormTopology1 = builder1.createTopology();
+
+ Config config = new Config();
+ config.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 1);
+ TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 2, executorMap1, 0);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ Topologies topologies = new Topologies(topoMap);
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
+ Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
+ Set<String> nodesIDs1 = new HashSet<>();
+ for (WorkerSlot slot : assignedSlots1) {
+ nodesIDs1.add(slot.getNodeId());
+ }
+ Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
+ List<Double> assignedExecutorMemory = new ArrayList<>();
+ List<Double> assignedExecutorCpu = new ArrayList<>();
+ for (ExecutorDetails executor : executors1) {
+ assignedExecutorMemory.add(topology1.getTotalMemReqTask(executor));
+ assignedExecutorCpu.add(topology1.getTotalCpuReqTask(executor));
+ }
+ Collections.sort(assignedExecutorCpu);
+ Collections.sort(assignedExecutorMemory);
+
+ Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>();
+ Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>();
+ Map<Double, Double> cpuAvailableToUsed = new HashMap();
+ Map<Double, Double> memoryAvailableToUsed = new HashMap();
+
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment1.getExecutorToSlot().entrySet()) {
+ executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId()));
+ }
+ for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) {
+ List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue());
+ if (executorsOnSupervisor == null) {
+ executorsOnSupervisor = new ArrayList<>();
+ supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor);
+ }
+ executorsOnSupervisor.add(entry.getKey());
+ }
+ for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
+ Double supervisorTotalCpu = entry.getKey().getTotalCPU();
+ Double supervisorTotalMemory = entry.getKey().getTotalMemory();
+ Double supervisorUsedCpu = 0.0;
+ Double supervisorUsedMemory = 0.0;
+ for (ExecutorDetails executor: entry.getValue()) {
+ supervisorUsedMemory += topology1.getTotalCpuReqTask(executor);
+ supervisorTotalCpu += topology1.getTotalMemReqTask(executor);
+ }
+ cpuAvailableToUsed.put(supervisorTotalCpu, supervisorUsedCpu);
+ memoryAvailableToUsed.put(supervisorTotalMemory, supervisorUsedMemory);
+ }
+ // executor0 resides one one worker (on one), executor1 and executor2 on another worker (on the other node)
+ Assert.assertEquals(2, assignedSlots1.size());
+ Assert.assertEquals(2, nodesIDs1.size());
+ Assert.assertEquals(3, executors1.size());
+
+ Assert.assertEquals(100.0, assignedExecutorCpu.get(0), 0.001);
+ Assert.assertEquals(250.0, assignedExecutorCpu.get(1), 0.001);
+ Assert.assertEquals(250.0, assignedExecutorCpu.get(2), 0.001);
+ Assert.assertEquals(600.0, assignedExecutorMemory.get(0), 0.001);
+ Assert.assertEquals(1200.0, assignedExecutorMemory.get(1), 0.001);
+ Assert.assertEquals(1200.0, assignedExecutorMemory.get(2), 0.001);
+
+ for (Map.Entry<Double, Double> entry : memoryAvailableToUsed.entrySet()) {
+ Assert.assertTrue(entry.getKey()- entry.getValue() >= 0);
+ }
+ for (Map.Entry<Double, Double> entry : cpuAvailableToUsed.entrySet()) {
+ Assert.assertTrue(entry.getKey()- entry.getValue() >= 0);
+ }
+ Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+ }
+
+ @Test
+ public void testScheduleResilience() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+ TopologyBuilder builder1 = new TopologyBuilder();
+ builder1.setSpout("wordSpout1", new TestWordSpout(), 3);
+ StormTopology stormTopology1 = builder1.createTopology();
+ Config config1 = new Config();
+ config1.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 3, 0);
+ TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 3, executorMap1, 0);
+
+ TopologyBuilder builder2 = new TopologyBuilder();
+ builder2.setSpout("wordSpout2", new TestWordSpout(), 2);
+ StormTopology stormTopology2 = builder2.createTopology();
+ Config config2 = new Config();
+ config2.putAll(defaultTopologyConf);
+ // memory requirement is large enough so that two executors can not be fully assigned to one node
+ config2.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 1280.0);
+ Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 0);
+ TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 2, executorMap2, 0);
+
+ // Test1: When a worker fails, RAS does not alter existing assignments on healthy workers
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology2.getId(), topology2);
+ Topologies topologies = new Topologies(topoMap);
+
+ rs.prepare(config1);
+ rs.schedule(topologies, cluster);
+
+ SchedulerAssignmentImpl assignment = (SchedulerAssignmentImpl)cluster.getAssignmentById(topology2.getId());
+ // pick a worker to mock as failed
+ WorkerSlot failedWorker = new ArrayList<WorkerSlot>(assignment.getSlots()).get(0);
+ Map<ExecutorDetails, WorkerSlot> executorToSlot = assignment.getExecutorToSlot();
+ List<ExecutorDetails> failedExecutors = new ArrayList<>();
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry : executorToSlot.entrySet()) {
+ if (entry.getValue().equals(failedWorker)) {
+ failedExecutors.add(entry.getKey());
+ }
+ }
+ for (ExecutorDetails executor : failedExecutors) {
+ executorToSlot.remove(executor); // remove executor details assigned to the failed worker
+ }
+ Map<ExecutorDetails, WorkerSlot> copyOfOldMapping = new HashMap<>(executorToSlot);
+ Set<ExecutorDetails> healthyExecutors = copyOfOldMapping.keySet();
+
+ rs.schedule(topologies, cluster);
+ SchedulerAssignment newAssignment = cluster.getAssignmentById(topology2.getId());
+ Map<ExecutorDetails, WorkerSlot> newExecutorToSlot = newAssignment.getExecutorToSlot();
+
+ for (ExecutorDetails executor : healthyExecutors) {
+ Assert.assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
+ }
+ Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
+ // end of Test1
+
+ // Test2: When a supervisor fails, RAS does not alter existing assignments
+ executorToSlot = new HashMap<>();
+ executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 0));
+ executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 1));
+ executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 1));
+ Map<String, SchedulerAssignmentImpl> existingAssignments = new HashMap<>();
+ assignment = new SchedulerAssignmentImpl(topology1.getId(), executorToSlot);
+ existingAssignments.put(topology1.getId(), assignment);
+ copyOfOldMapping = new HashMap<>(executorToSlot);
+ Set<ExecutorDetails> existingExecutors = copyOfOldMapping.keySet();
+ Map<String, SupervisorDetails> supMap1 = new HashMap<>(supMap);
+ supMap1.remove("sup-0"); // mock the supervisor sup-0 as a failed supervisor
+ Cluster cluster1 = new Cluster(iNimbus, supMap1, existingAssignments, config1);
+
+ topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster1);
+
+ newAssignment = cluster1.getAssignmentById(topology1.getId());
+ newExecutorToSlot = newAssignment.getExecutorToSlot();
+
+ for (ExecutorDetails executor : existingExecutors) {
+ Assert.assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
+ }
+ Assert.assertEquals("Fully Scheduled", cluster1.getStatusMap().get(topology1.getId()));
+ // end of Test2
+
+ // Test3: When a supervisor and a worker on it fails, RAS does not alter existing assignments
+ executorToSlot = new HashMap<>();
+ executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 1)); // the worker to orphan
+ executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 2)); // the worker that fails
+ executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 1)); // the healthy worker
+ existingAssignments = new HashMap<>();
+ assignment = new SchedulerAssignmentImpl(topology1.getId(), executorToSlot);
+ existingAssignments.put(topology1.getId(), assignment);
+ // delete one worker of sup-0 (failed) from topo1 assignment to enable actual schedule for testing
+ executorToSlot.remove(new ExecutorDetails(1, 1));
+
+ copyOfOldMapping = new HashMap<>(executorToSlot);
+ existingExecutors = copyOfOldMapping.keySet(); // namely the two eds on the orphaned worker and the healthy worker
+ supMap1 = new HashMap<>(supMap);
+ supMap1.remove("sup-0"); // mock the supervisor sup-0 as a failed supervisor
+ cluster1 = new Cluster(iNimbus, supMap1, existingAssignments, config1);
+
+ topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster1);
+
+ newAssignment = cluster1.getAssignmentById(topology1.getId());
+ newExecutorToSlot = newAssignment.getExecutorToSlot();
+
+ for (ExecutorDetails executor : existingExecutors) {
+ Assert.assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
+ }
+ Assert.assertEquals("Fully Scheduled", cluster1.getStatusMap().get(topology1.getId()));
+ // end of Test3
+
+ // Test4: Scheduling a new topology does not disturb other assignments unnecessarily
+ cluster1 = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1);
+ topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster1);
+ assignment = (SchedulerAssignmentImpl)cluster1.getAssignmentById(topology1.getId());
+ executorToSlot = assignment.getExecutorToSlot();
+ copyOfOldMapping = new HashMap<>(executorToSlot);
+
+ topoMap.put(topology2.getId(), topology2);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster1);
+
+ newAssignment = (SchedulerAssignmentImpl)cluster1.getAssignmentById(topology1.getId());
+ newExecutorToSlot = newAssignment.getExecutorToSlot();
+
+ for (ExecutorDetails executor : copyOfOldMapping.keySet()) {
+ Assert.assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
+ }
+ Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster1.getStatusMap().get(topology1.getId()));
+ Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster1.getStatusMap().get(topology2.getId()));
+ }
+
+ @Test
+ public void testHeterogeneousCluster() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap1 = new HashMap<>(); // strong supervisor node
+ resourceMap1.put(Config.SUPERVISOR_CPU_CAPACITY, 800.0);
+ resourceMap1.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 4096.0);
+ Map<String, Number> resourceMap2 = new HashMap<>(); // weak supervisor node
+ resourceMap2.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
+ resourceMap2.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0);
+
+ Map<String, SupervisorDetails> supMap = new HashMap<String, SupervisorDetails>();
+ for (int i = 0; i < 2; i++) {
+ List<Number> ports = new LinkedList<Number>();
+ for (int j = 0; j < 4; j++) {
+ ports.add(j);
+ }
+ SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, (Map)(i == 0 ? resourceMap1 : resourceMap2));
+ supMap.put(sup.getId(), sup);
+ }
+
+ // topo1 has one single huge task that can not be handled by the small-super
+ TopologyBuilder builder1 = new TopologyBuilder();
+ builder1.setSpout("wordSpout1", new TestWordSpout(), 1).setCPULoad(300.0).setMemoryLoad(2000.0, 48.0);
+ StormTopology stormTopology1 = builder1.createTopology();
+ Config config1 = new Config();
+ config1.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 0);
+ TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0);
+
+ // topo2 has 4 large tasks
+ TopologyBuilder builder2 = new TopologyBuilder();
+ builder2.setSpout("wordSpout2", new TestWordSpout(), 4).setCPULoad(100.0).setMemoryLoad(500.0, 12.0);
+ StormTopology stormTopology2 = builder2.createTopology();
+ Config config2 = new Config();
+ config2.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 4, 0);
+ TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0);
+
+ // topo3 has 4 large tasks
+ TopologyBuilder builder3 = new TopologyBuilder();
+ builder3.setSpout("wordSpout3", new TestWordSpout(), 4).setCPULoad(20.0).setMemoryLoad(200.0, 56.0);
+ StormTopology stormTopology3 = builder3.createTopology();
+ Config config3 = new Config();
+ config3.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap3 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology3, 4, 0);
+ TopologyDetails topology3 = new TopologyDetails("topology3", config2, stormTopology3, 1, executorMap3, 0);
+
+ // topo4 has 12 small tasks, whose mem usage does not exactly divide a node's mem capacity
+ TopologyBuilder builder4 = new TopologyBuilder();
+ builder4.setSpout("wordSpout4", new TestWordSpout(), 12).setCPULoad(30.0).setMemoryLoad(100.0, 0.0);
+ StormTopology stormTopology4 = builder4.createTopology();
+ Config config4 = new Config();
+ config4.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap4 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology4, 12, 0);
+ TopologyDetails topology4 = new TopologyDetails("topology4", config4, stormTopology4, 1, executorMap4, 0);
+
+ // topo5 has 40 small tasks, it should be able to exactly use up both the cpu and mem in the cluster
+ TopologyBuilder builder5 = new TopologyBuilder();
+ builder5.setSpout("wordSpout5", new TestWordSpout(), 40).setCPULoad(25.0).setMemoryLoad(100.0, 28.0);
+ StormTopology stormTopology5 = builder5.createTopology();
+ Config config5 = new Config();
+ config5.putAll(defaultTopologyConf);
+ Map<ExecutorDetails, String> executorMap5 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology5, 40, 0);
+ TopologyDetails topology5 = new TopologyDetails("topology5", config5, stormTopology5, 1, executorMap5, 0);
+
+ // Test1: Launch topo 1-3 together, it should be able to use up either mem or cpu resource due to exact division
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ topoMap.put(topology2.getId(), topology2);
+ topoMap.put(topology3.getId(), topology3);
+ Topologies topologies = new Topologies(topoMap);
+ rs.prepare(config1);
+ rs.schedule(topologies, cluster);
+
+ Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+ Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
+ Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology3.getId()));
+
+ Map<SupervisorDetails, Double> superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
+ Map<SupervisorDetails, Double> superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
+
+ final Double EPSILON = 0.0001;
+ for (SupervisorDetails supervisor : supMap.values()) {
+ Double cpuAvailable = supervisor.getTotalCPU();
+ Double memAvailable = supervisor.getTotalMemory();
+ Double cpuUsed = superToCpu.get(supervisor);
+ Double memUsed = superToMem.get(supervisor);
+ Assert.assertTrue((Math.abs(memAvailable - memUsed) < EPSILON) || (Math.abs(cpuAvailable - cpuUsed) < EPSILON));
+ }
+ // end of Test1
+
+ // Test2: Launch topo 1, 2 and 4, they together request a little more mem than available, so one of the 3 topos will not be scheduled
+ cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1);
+ topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ topoMap.put(topology2.getId(), topology2);
+ topoMap.put(topology4.getId(), topology4);
+ topologies = new Topologies(topoMap);
+ rs.prepare(config1);
+ rs.schedule(topologies, cluster);
+ int numTopologiesAssigned = 0;
+ if (cluster.getStatusMap().get(topology1.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) {
+ numTopologiesAssigned++;
+ }
+ if (cluster.getStatusMap().get(topology2.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) {
+ numTopologiesAssigned++;
+ }
+ if (cluster.getStatusMap().get(topology4.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) {
+ numTopologiesAssigned++;
+ }
+ Assert.assertEquals(2, numTopologiesAssigned);
+ //end of Test2
+
+ //Test3: "Launch topo5 only, both mem and cpu should be exactly used up"
+ cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1);
+ topoMap = new HashMap<>();
+ topoMap.put(topology5.getId(), topology5);
+ topologies = new Topologies(topoMap);
+ rs.prepare(config1);
+ rs.schedule(topologies, cluster);
+ superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
+ superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
+ for (SupervisorDetails supervisor : supMap.values()) {
+ Double cpuAvailable = supervisor.getTotalCPU();
+ Double memAvailable = supervisor.getTotalMemory();
+ Double cpuUsed = superToCpu.get(supervisor);
+ Double memUsed = superToMem.get(supervisor);
+ Assert.assertEquals(cpuAvailable, cpuUsed, 0.0001);
+ Assert.assertEquals(memAvailable, memUsed, 0.0001);
+ }
+ //end of Test3
+ }
+
+ @Test
+ public void testTopologyWorkerMaxHeapSize() {
+ // Test1: If RAS spreads executors across multiple workers based on the set limit for a worker used by the topology
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+ TopologyBuilder builder1 = new TopologyBuilder();
+ builder1.setSpout("wordSpout1", new TestWordSpout(), 4);
+ StormTopology stormTopology1 = builder1.createTopology();
+ Config config1 = new Config();
+ config1.putAll(defaultTopologyConf);
+ config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
+ Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 4, 0);
+ TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1);
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topology1.getId(), topology1);
+ Topologies topologies = new Topologies(topoMap);
+ rs.prepare(config1);
+ rs.schedule(topologies, cluster);
+ Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+ Assert.assertEquals(4, cluster.getAssignedNumWorkers(topology1));
+
+ // Test2: test when no more workers are available due to topology worker max heap size limit but there is memory is still available
+ // wordSpout2 is going to contain 5 executors that needs scheduling. Each of those executors has a memory requirement of 128.0 MB
+ // The cluster contains 4 free WorkerSlots. For this topolology each worker is limited to a max heap size of 128.0
+ // Thus, one executor not going to be able to get scheduled thus failing the scheduling of this topology and no executors of this topology will be scheduleded
+ TopologyBuilder builder2 = new TopologyBuilder();
+ builder2.setSpout("wordSpout2", new TestWordSpout(), 5);
+ StormTopology stormTopology2 = builder2.createTopology();
+ Config config2 = new Config();
+ config2.putAll(defaultTopologyConf);
+ config2.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
+ Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 5, 0);
+ TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0);
+ cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config2);
+ topoMap = new HashMap<>();
+ topoMap.put(topology2.getId(), topology2);
+ topologies = new Topologies(topoMap);
+ rs.prepare(config2);
+ rs.schedule(topologies, cluster);
+ Assert.assertEquals("Not enough resources to schedule - 0/5 executors scheduled", cluster.getStatusMap().get(topology2.getId()));
+ Assert.assertEquals(5, cluster.getUnassignedExecutors(topology2).size());
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testMemoryLoadLargerThanMaxHeapSize() throws Exception {
+ // Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=128.0 < 129.0,
+ // Largest memory requirement of a component in the topology).
+ TopologyBuilder builder1 = new TopologyBuilder();
+ builder1.setSpout("wordSpout1", new TestWordSpout(), 4);
+ StormTopology stormTopology1 = builder1.createTopology();
+ Config config1 = new Config();
+ config1.putAll(defaultTopologyConf);
+ config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
+ config1.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 129.0);
+ StormSubmitter.submitTopologyWithProgressBar("test", config1, stormTopology1);
+ }
+
@Test
public void TestReadInResourceAwareSchedulerUserPools() {
Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
http://git-wip-us.apache.org/repos/asf/storm/blob/c1b93de1/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index f21645b..7cd21ce 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -29,6 +29,8 @@ import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.Cluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -50,6 +52,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.ArrayList;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -109,7 +112,7 @@ public class TestUtilsForResourceAwareScheduler {
public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology, int spoutParallelism, int boltParallelism) {
Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String>();
int startTask = 0;
- int endTask = 1;
+ int endTask = 0;
for (Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
for (int i = 0; i < spoutParallelism; i++) {
retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
@@ -285,4 +288,72 @@ public class TestUtilsForResourceAwareScheduler {
}
return ret;
}
+
+ public static Map<SupervisorDetails, Double> getSupervisorToMemoryUsage(Cluster cluster, Topologies topologies) {
+ Map<SupervisorDetails, Double> superToMem = new HashMap<>();
+ Collection<SchedulerAssignment> assignments = cluster.getAssignments().values();
+ Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
+ for (SupervisorDetails supervisor : supervisors) {
+ superToMem.put(supervisor, 0.0);
+ }
+
+ for (SchedulerAssignment assignment : assignments) {
+ Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>();
+ Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>();
+ TopologyDetails topology = topologies.getById(assignment.getTopologyId());
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment.getExecutorToSlot().entrySet()) {
+ executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId()));
+ }
+ for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) {
+ List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue());
+ if (executorsOnSupervisor == null) {
+ executorsOnSupervisor = new ArrayList<>();
+ supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor);
+ }
+ executorsOnSupervisor.add(entry.getKey());
+ }
+ for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
+ Double supervisorUsedMemory = 0.0;
+ for (ExecutorDetails executor: entry.getValue()) {
+ supervisorUsedMemory += topology.getTotalMemReqTask(executor);
+ }
+ superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory);
+ }
+ }
+ return superToMem;
+ }
+
+ public static Map<SupervisorDetails, Double> getSupervisorToCpuUsage(Cluster cluster, Topologies topologies) {
+ Map<SupervisorDetails, Double> superToCpu = new HashMap<>();
+ Collection<SchedulerAssignment> assignments = cluster.getAssignments().values();
+ Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
+ for (SupervisorDetails supervisor : supervisors) {
+ superToCpu.put(supervisor, 0.0);
+ }
+
+ for (SchedulerAssignment assignment : assignments) {
+ Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>();
+ Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>();
+ TopologyDetails topology = topologies.getById(assignment.getTopologyId());
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment.getExecutorToSlot().entrySet()) {
+ executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId()));
+ }
+ for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) {
+ List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue());
+ if (executorsOnSupervisor == null) {
+ executorsOnSupervisor = new ArrayList<>();
+ supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor);
+ }
+ executorsOnSupervisor.add(entry.getKey());
+ }
+ for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
+ Double supervisorUsedCpu = 0.0;
+ for (ExecutorDetails executor: entry.getValue()) {
+ supervisorUsedCpu += topology.getTotalCpuReqTask(executor);
+ }
+ superToCpu.put(entry.getKey(), superToCpu.get(entry.getKey()) + supervisorUsedCpu);
+ }
+ }
+ return superToCpu;
+ }
}
[5/8] storm git commit: Modify comments
Posted by zh...@apache.org.
Modify comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/13e8b11a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/13e8b11a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/13e8b11a
Branch: refs/heads/master
Commit: 13e8b11a580cea2e67760d14b6efef9a943b3faf
Parents: 58f1161
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Mar 25 15:35:54 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Mar 25 15:35:54 2016 -0500
----------------------------------------------------------------------
.../storm/scheduler/resource/TestResourceAwareScheduler.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/13e8b11a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 9cfdc6e..5ae1432 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -257,9 +257,10 @@ public class TestResourceAwareScheduler {
Map<String, Number> resourceMap = new HashMap<>();
resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+ // to test whether two tasks will be assigned to one or two nodes
Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
- TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
+ TopologyBuilder builder1 = new TopologyBuilder();
builder1.setSpout("wordSpout", new TestWordSpout(), 1).setCPULoad(20.0).setMemoryLoad(200.0);
builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(20.0).setMemoryLoad(200.0);
StormTopology stormTopology1 = builder1.createTopology();
[3/8] storm git commit: Delete the clj code
Posted by zh...@apache.org.
Delete the clj code
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63406016
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63406016
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63406016
Branch: refs/heads/master
Commit: 6340601684ccd1cbe86d27594f8e32e78ec8a0df
Parents: 7a302e3
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Mar 18 16:13:28 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Mar 18 16:13:28 2016 -0500
----------------------------------------------------------------------
.../scheduler/resource_aware_scheduler_test.clj | 738 -------------------
1 file changed, 738 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/63406016/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
deleted file mode 100644
index 4ca0721..0000000
--- a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
+++ /dev/null
@@ -1,738 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.scheduler.resource-aware-scheduler-test
- (:use [clojure test])
- (:use [org.apache.storm util config testing])
- (:use [org.apache.storm.internal thrift])
- (:require [org.apache.storm.util :refer [map-val]])
- (:require [org.apache.storm.daemon [nimbus :as nimbus]])
- (:import [org.apache.storm.generated StormTopology]
- [org.apache.storm Config]
- [org.apache.storm.testing TestWordSpout TestWordCounter]
- [org.apache.storm.topology TopologyBuilder]
- [org.apache.storm.utils Utils])
- (:import [org.apache.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails
- SchedulerAssignmentImpl Topologies TopologyDetails])
- (:import [org.apache.storm.scheduler.resource RAS_Node RAS_Nodes ResourceAwareScheduler])
- (:import [org.apache.storm Config StormSubmitter])
- (:import [org.apache.storm LocalDRPC LocalCluster])
- (:import [java.util HashMap]))
-
-(defn gen-supervisors [count ports]
- (into {} (for [id (range count)
- :let [supervisor (SupervisorDetails. (str "id" id)
- (str "host" id)
- (list ) (map int (range ports))
- {Config/SUPERVISOR_MEMORY_CAPACITY_MB 2000.0
- Config/SUPERVISOR_CPU_CAPACITY 400.0})]]
- {(.getId supervisor) supervisor})))
-
-(defn to-top-map [topologies]
- (into {} (for [top topologies] {(.getId top) top})))
-
-(defn ed [id] (ExecutorDetails. (int id) (int id)))
-
-(defn mk-ed-map [arg]
- (into {}
- (for [[name start end] arg]
- (into {}
- (for [at (range start end)]
- {(ed at) name})))))
-(def DEFAULT_PRIORITY_STRATEGY "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy")
-(def DEFAULT_EVICTION_STRATEGY "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy")
-(def DEFAULT_SCHEDULING_STRATEGY "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy")
-
-;; get the super->mem HashMap by counting the eds' mem usage of all topos on each super
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn get-super->mem-usage [^Cluster cluster ^Topologies topologies]
- (let [assignments (.values (.getAssignments cluster))
- supers (.values (.getSupervisors cluster))
- super->mem-usage (HashMap.)
- _ (doseq [super supers]
- (.put super->mem-usage super 0))] ;; initialize the mem-usage as 0 for all supers
- (doseq [assignment assignments]
- (let [ed->super (into {}
- (for [[ed slot] (.getExecutorToSlot assignment)]
- {ed (.getSupervisorById cluster (.getNodeId slot))}))
- super->eds (clojurify-structure (Utils/reverseMap ed->super))
- topology (.getById topologies (.getTopologyId assignment))
- super->mem-pertopo (map-val (fn [eds]
- (reduce + (map #(.getTotalMemReqTask topology %) eds)))
- super->eds)] ;; sum up the one topo's eds' mem usage on a super
- (doseq [[super mem] super->mem-pertopo]
- (.put super->mem-usage
- super (+ mem (.get super->mem-usage super)))))) ;; add all topo's mem usage for each super
- super->mem-usage))
-
-;; get the super->cpu HashMap by counting the eds' cpu usage of all topos on each super
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn get-super->cpu-usage [^Cluster cluster ^Topologies topologies]
- (let [assignments (.values (.getAssignments cluster))
- supers (.values (.getSupervisors cluster))
- super->cpu-usage (HashMap.)
- _ (doseq [super supers]
- (.put super->cpu-usage super 0))] ;; initialize the cpu-usage as 0 for all supers
- (doseq [assignment assignments]
- (let [ed->super (into {}
- (for [[ed slot] (.getExecutorToSlot assignment)]
- {ed (.getSupervisorById cluster (.getNodeId slot))}))
- super->eds (clojurify-structure (Utils/reverseMap ed->super))
- topology (.getById topologies (.getTopologyId assignment))
- super->cpu-pertopo (map-val (fn [eds]
- (reduce + (map #(.getTotalCpuReqTask topology %) eds)))
- super->eds)] ;; sum up the one topo's eds' cpu usage on a super
- (doseq [[super cpu] super->cpu-pertopo]
- (.put super->cpu-usage
- super (+ cpu (.get super->cpu-usage super)))))) ;; add all topo's cpu usage for each super
- super->cpu-usage))
-
-; testing resource/Node class
-(deftest test-node
- (let [supers (gen-supervisors 5 4)
- cluster (Cluster. (nimbus/standalone-nimbus) supers {} {})
- topologies (Topologies. (to-top-map []))
- node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
- topology1 (TopologyDetails. "topology1" {} nil 0)
- topology2 (TopologyDetails. "topology2" {} nil 0)]
- (is (= 5 (.size node-map)))
- (let [node (.get node-map "id0")]
- (is (= "id0" (.getId node)))
- (is (= true (.isAlive node)))
- (is (= 0 (.size (.getRunningTopologies node))))
- (is (= true (.isTotallyFree node)))
- (is (= 4 (.totalSlotsFree node)))
- (is (= 0 (.totalSlotsUsed node)))
- (is (= 4 (.totalSlots node)))
- (.assign node (.next (.iterator (.getFreeSlots node))) topology1 (list (ExecutorDetails. 1 1)))
- (is (= 1 (.size (.getRunningTopologies node))))
- (is (= false (.isTotallyFree node)))
- (is (= 3 (.totalSlotsFree node)))
- (is (= 1 (.totalSlotsUsed node)))
- (is (= 4 (.totalSlots node)))
- (.assign node (.next (.iterator (.getFreeSlots node))) topology1 (list (ExecutorDetails. 2 2)))
- (is (= 1 (.size (.getRunningTopologies node))))
- (is (= false (.isTotallyFree node)))
- (is (= 2 (.totalSlotsFree node)))
- (is (= 2 (.totalSlotsUsed node)))
- (is (= 4 (.totalSlots node)))
- (.assign node (.next (.iterator (.getFreeSlots node))) topology2 (list (ExecutorDetails. 1 1)))
- (is (= 2 (.size (.getRunningTopologies node))))
- (is (= false (.isTotallyFree node)))
- (is (= 1 (.totalSlotsFree node)))
- (is (= 3 (.totalSlotsUsed node)))
- (is (= 4 (.totalSlots node)))
- (.assign node (.next (.iterator (.getFreeSlots node))) topology2 (list (ExecutorDetails. 2 2)))
- (is (= 2 (.size (.getRunningTopologies node))))
- (is (= false (.isTotallyFree node)))
- (is (= 0 (.totalSlotsFree node)))
- (is (= 4 (.totalSlotsUsed node)))
- (is (= 4 (.totalSlots node)))
- (.freeAllSlots node)
- (is (= 0 (.size (.getRunningTopologies node))))
- (is (= true (.isTotallyFree node)))
- (is (= 4 (.totalSlotsFree node)))
- (is (= 0 (.totalSlotsUsed node)))
- (is (= 4 (.totalSlots node)))
- )))
-
-(deftest test-sanity-resource-aware-scheduler
- (let [builder (TopologyBuilder.)
- _ (.setSpout builder "wordSpout" (TestWordSpout.) 1)
- _ (.shuffleGrouping (.setBolt builder "wordCountBolt" (TestWordCounter.) 1) "wordSpout")
- supers (gen-supervisors 1 2)
- storm-topology (.createTopology builder)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology
- 1
- (mk-ed-map [["wordSpout" 0 1]
- ["wordCountBolt" 1 2]]))
- cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1]))
- node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
- scheduler (ResourceAwareScheduler.)]
- (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- (.schedule scheduler topologies cluster)
- (let [assignment (.getAssignmentById cluster "topology1")
- assigned-slots (.getSlots assignment)
- executors (.getExecutors assignment)]
- (is (= 1 (.size assigned-slots)))
- (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
- (is (= 2 (.size executors))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))))
-
-(deftest test-topology-with-multiple-spouts
- (let [builder1 (TopologyBuilder.) ;; a topology with multiple spouts
- _ (.setSpout builder1 "wordSpout1" (TestWordSpout.) 1)
- _ (.setSpout builder1 "wordSpout2" (TestWordSpout.) 1)
- _ (doto
- (.setBolt builder1 "wordCountBolt1" (TestWordCounter.) 1)
- (.shuffleGrouping "wordSpout1")
- (.shuffleGrouping "wordSpout2"))
- _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt2" (TestWordCounter.) 1) "wordCountBolt1")
- _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt3" (TestWordCounter.) 1) "wordCountBolt1")
- _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt4" (TestWordCounter.) 1) "wordCountBolt2")
- _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt5" (TestWordCounter.) 1) "wordSpout2")
- storm-topology1 (.createTopology builder1)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology1
- 1
- (mk-ed-map [["wordSpout1" 0 1]
- ["wordSpout2" 1 2]
- ["wordCountBolt1" 2 3]
- ["wordCountBolt2" 3 4]
- ["wordCountBolt3" 4 5]
- ["wordCountBolt4" 5 6]
- ["wordCountBolt5" 6 7]]))
- builder2 (TopologyBuilder.) ;; a topology with two unconnected partitions
- _ (.setSpout builder2 "wordSpoutX" (TestWordSpout.) 1)
- _ (.setSpout builder2 "wordSpoutY" (TestWordSpout.) 1)
- storm-topology2 (.createTopology builder1)
- topology2 (TopologyDetails. "topology2"
- {TOPOLOGY-NAME "topology-name-2"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology2
- 1
- (mk-ed-map [["wordSpoutX" 0 1]
- ["wordSpoutY" 1 2]]))
- supers (gen-supervisors 2 4)
- cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1 topology2]))
- scheduler (ResourceAwareScheduler.)]
- (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- (.schedule scheduler topologies cluster)
- (let [assignment (.getAssignmentById cluster "topology1")
- assigned-slots (.getSlots assignment)
- executors (.getExecutors assignment)]
- (is (= 1 (.size assigned-slots)))
- (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
- (is (= 7 (.size executors))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
- (let [assignment (.getAssignmentById cluster "topology2")
- assigned-slots (.getSlots assignment)
- executors (.getExecutors assignment)]
- (is (= 1 (.size assigned-slots)))
- (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
- (is (= 2 (.size executors))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
-
-(deftest test-topology-set-memory-and-cpu-load
- (let [builder (TopologyBuilder.)
- _ (.setSpout builder "wordSpout" (TestWordSpout.) 1)
- _ (doto
- (.setBolt builder "wordCountBolt" (TestWordCounter.) 1)
- (.setMemoryLoad 110.0)
- (.setCPULoad 20.0)
- (.shuffleGrouping "wordSpout"))
- supers (gen-supervisors 2 2) ;; to test whether two tasks will be assigned to one or two nodes
- storm-topology (.createTopology builder)
- topology2 (TopologyDetails. "topology2"
- {TOPOLOGY-NAME "topology-name-2"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology
- 2
- (mk-ed-map [["wordSpout" 0 1]
- ["wordCountBolt" 1 2]]))
- cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.testing.AlternateRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology2]))
- scheduler (ResourceAwareScheduler.)]
- (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- (.schedule scheduler topologies cluster)
- (let [assignment (.getAssignmentById cluster "topology2")
- assigned-slots (.getSlots assignment)
- executors (.getExecutors assignment)]
- ;; 4 slots on 1 machine, all executors assigned
- (is (= 1 (.size assigned-slots)))
- (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
- (is (= 2 (.size executors))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
-
-(deftest test-resource-limitation
- (let [builder (TopologyBuilder.)
- _ (doto (.setSpout builder "wordSpout" (TestWordSpout.) 2)
- (.setMemoryLoad 1000.0 200.0)
- (.setCPULoad 250.0))
- _ (doto (.setBolt builder "wordCountBolt" (TestWordCounter.) 1)
- (.shuffleGrouping "wordSpout")
- (.setMemoryLoad 500.0 100.0)
- (.setCPULoad 100.0))
- supers (gen-supervisors 2 2) ;; need at least two nodes to hold these executors
- storm-topology (.createTopology builder)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology
- 2 ;; need two workers, each on one node
- (mk-ed-map [["wordSpout" 0 2]
- ["wordCountBolt" 2 3]]))
- cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1]))
- scheduler (ResourceAwareScheduler.)]
- (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- (.schedule scheduler topologies cluster)
- (let [assignment (.getAssignmentById cluster "topology1")
- assigned-slots (.getSlots assignment)
- node-ids (map #(.getNodeId %) assigned-slots)
- executors (.getExecutors assignment)
- epsilon 0.000001
- assigned-ed-mem (sort (map #(.getTotalMemReqTask topology1 %) executors))
- assigned-ed-cpu (sort (map #(.getTotalCpuReqTask topology1 %) executors))
- ed->super (into {}
- (for [[ed slot] (.getExecutorToSlot assignment)]
- {ed (.getSupervisorById cluster (.getNodeId slot))}))
- super->eds (clojurify-structure (Utils/reverseMap ed->super))
- mem-avail->used (into []
- (for [[super eds] super->eds]
- [(.getTotalMemory super) (reduce + (map #(.getTotalMemReqTask topology1 %) eds))]))
- cpu-avail->used (into []
- (for [[super eds] super->eds]
- [(.getTotalCPU super) (reduce + (map #(.getTotalCpuReqTask topology1 %) eds))]))]
- ;; 4 slots on 1 machine, all executors assigned
- (is (= 2 (.size assigned-slots))) ;; executor0 resides one one worker (on one), executor1 and executor2 on another worker (on the other node)
- (is (= 2 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
- (is (= 3 (.size executors)))
- ;; make sure resource (mem/cpu) assigned equals to resource specified
- (is (< (Math/abs (- 600.0 (first assigned-ed-mem))) epsilon))
- (is (< (Math/abs (- 1200.0 (second assigned-ed-mem))) epsilon))
- (is (< (Math/abs (- 1200.0 (last assigned-ed-mem))) epsilon))
- (is (< (Math/abs (- 100.0 (first assigned-ed-cpu))) epsilon))
- (is (< (Math/abs (- 250.0 (second assigned-ed-cpu))) epsilon))
- (is (< (Math/abs (- 250.0 (last assigned-ed-cpu))) epsilon))
- (doseq [[avail used] mem-avail->used] ;; for each node, assigned mem smaller than total
- (is (>= avail used)))
- (doseq [[avail used] cpu-avail->used] ;; for each node, assigned cpu smaller than total
- (is (>= avail used))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))))
-
-(deftest test-scheduling-resilience
- (let [supers (gen-supervisors 2 2)
- builder1 (TopologyBuilder.)
- _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
- storm-topology1 (.createTopology builder1)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology1
- 3 ;; three workers to hold three executors
- (mk-ed-map [["spout1" 0 3]]))
- builder2 (TopologyBuilder.)
- _ (.setSpout builder2 "spout2" (TestWordSpout.) 2)
- storm-topology2 (.createTopology builder2)
- topology2 (TopologyDetails. "topology2"
- {TOPOLOGY-NAME "topology-name-2"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 1280.0 ;; large enough thus two eds can not be fully assigned to one node
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology2
- 2 ;; two workers, each holds one executor and resides on one node
- (mk-ed-map [["spout2" 0 2]]))
- scheduler (ResourceAwareScheduler.)]
-
- (testing "When a worker fails, RAS does not alter existing assignments on healthy workers"
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology2]))
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies cluster)
- assignment (.getAssignmentById cluster "topology2")
- failed-worker (first (vec (.getSlots assignment))) ;; choose a worker to mock as failed
- ed->slot (.getExecutorToSlot assignment)
- failed-eds (.get (clojurify-structure (Utils/reverseMap ed->slot)) failed-worker)
- _ (doseq [ed failed-eds] (.remove ed->slot ed)) ;; remove executor details assigned to the worker
- copy-old-mapping (HashMap. ed->slot)
- healthy-eds (.keySet copy-old-mapping)
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies cluster)
- new-assignment (.getAssignmentById cluster "topology2")
- new-ed->slot (.getExecutorToSlot new-assignment)]
- ;; for each executor that was scheduled on healthy workers, their slots should remain unchanged after a new scheduling
- (doseq [ed healthy-eds]
- (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
-
- (testing "When a supervisor fails, RAS does not alter existing assignments"
- (let [existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1"
- {(ExecutorDetails. 0 0) (WorkerSlot. "id0" 0) ;; worker 0 on the failed super
- (ExecutorDetails. 1 1) (WorkerSlot. "id0" 1) ;; worker 1 on the failed super
- (ExecutorDetails. 2 2) (WorkerSlot. "id1" 1)})} ;; worker 2 on the health super
- cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1]))
- assignment (.getAssignmentById cluster "topology1")
- ed->slot (.getExecutorToSlot assignment)
- copy-old-mapping (HashMap. ed->slot)
- existing-eds (.keySet copy-old-mapping) ;; all the three eds on three workers
- new-cluster (Cluster. (nimbus/standalone-nimbus)
- (dissoc supers "id0") ;; mock the super0 as a failed supervisor
- (.getAssignments cluster)
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies new-cluster) ;; the actual schedule for this topo will not run since it is fully assigned
- new-assignment (.getAssignmentById new-cluster "topology1")
- new-ed->slot (.getExecutorToSlot new-assignment)]
- (doseq [ed existing-eds]
- (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
- (is (= "Fully Scheduled" (.get (.getStatusMap new-cluster) "topology1")))))
-
- (testing "When a supervisor and a worker on it fails, RAS does not alter existing assignments"
- (let [existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1"
- {(ExecutorDetails. 0 0) (WorkerSlot. "id0" 1) ;; the worker to orphan
- (ExecutorDetails. 1 1) (WorkerSlot. "id0" 2) ;; the worker to kill
- (ExecutorDetails. 2 2) (WorkerSlot. "id1" 1)})} ;; the healthy worker
- cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1]))
- assignment (.getAssignmentById cluster "topology1")
- ed->slot (.getExecutorToSlot assignment)
- _ (.remove ed->slot (ExecutorDetails. 1 1)) ;; delete one worker of super0 (failed) from topo1 assignment to enable actual schedule for testing
- copy-old-mapping (HashMap. ed->slot)
- existing-eds (.keySet copy-old-mapping) ;; namely the two eds on the orphaned worker and the healthy worker
- new-cluster (Cluster. (nimbus/standalone-nimbus)
- (dissoc supers "id0") ;; mock the super0 as a failed supervisor
- (.getAssignments cluster)
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies new-cluster)
- new-assignment (.getAssignmentById new-cluster "topology1")
- new-ed->slot (.getExecutorToSlot new-assignment)]
- (doseq [ed existing-eds]
- (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap new-cluster) "topology1")))))
-
- (testing "Scheduling a new topology does not disturb other assignments unnecessarily"
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1]))
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies cluster)
- assignment (.getAssignmentById cluster "topology1")
- ed->slot (.getExecutorToSlot assignment)
- copy-old-mapping (HashMap. ed->slot)
- new-topologies (Topologies. (to-top-map [topology1 topology2])) ;; a second topology joins
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler new-topologies cluster)
- new-assignment (.getAssignmentById cluster "topology1")
- new-ed->slot (.getExecutorToSlot new-assignment)]
- (doseq [ed (.keySet copy-old-mapping)]
- (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed)))) ;; the assignment for topo1 should not change
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))))
-
-;; Automated tests for heterogeneous cluster
-(deftest test-heterogeneous-cluster
- (let [supers (into {} (for [super [(SupervisorDetails. (str "id" 0) (str "host" 0) (list )
- (map int (list 1 2 3 4))
- {Config/SUPERVISOR_MEMORY_CAPACITY_MB 4096.0
- Config/SUPERVISOR_CPU_CAPACITY 800.0})
- (SupervisorDetails. (str "id" 1) (str "host" 1) (list )
- (map int (list 1 2 3 4))
- {Config/SUPERVISOR_MEMORY_CAPACITY_MB 1024.0
- Config/SUPERVISOR_CPU_CAPACITY 200.0})]]
- {(.getId super) super}))
- builder1 (TopologyBuilder.) ;; topo1 has one single huge task that can not be handled by the small-super
- _ (doto (.setSpout builder1 "spout1" (TestWordSpout.) 1)
- (.setMemoryLoad 2000.0 48.0)
- (.setCPULoad 300.0))
- storm-topology1 (.createTopology builder1)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology1
- 1
- (mk-ed-map [["spout1" 0 1]]))
- builder2 (TopologyBuilder.) ;; topo2 has 4 large tasks
- _ (doto (.setSpout builder2 "spout2" (TestWordSpout.) 4)
- (.setMemoryLoad 500.0 12.0)
- (.setCPULoad 100.0))
- storm-topology2 (.createTopology builder2)
- topology2 (TopologyDetails. "topology2"
- {TOPOLOGY-NAME "topology-name-2"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology2
- 2
- (mk-ed-map [["spout2" 0 4]]))
- builder3 (TopologyBuilder.) ;; topo3 has 4 medium tasks, launching topo 1-3 together requires the same mem as the cluster's mem capacity (5G)
- _ (doto (.setSpout builder3 "spout3" (TestWordSpout.) 4)
- (.setMemoryLoad 200.0 56.0)
- (.setCPULoad 20.0))
- storm-topology3 (.createTopology builder3)
- topology3 (TopologyDetails. "topology3"
- {TOPOLOGY-NAME "topology-name-3"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology3
- 2
- (mk-ed-map [["spout3" 0 4]]))
- builder4 (TopologyBuilder.) ;; topo4 has 12 small tasks, each's mem req does not exactly divide a node's mem capacity
- _ (doto (.setSpout builder4 "spout4" (TestWordSpout.) 2)
- (.setMemoryLoad 100.0 0.0)
- (.setCPULoad 30.0))
- storm-topology4 (.createTopology builder4)
- topology4 (TopologyDetails. "topology4"
- {TOPOLOGY-NAME "topology-name-4"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology4
- 2
- (mk-ed-map [["spout4" 0 12]]))
- builder5 (TopologyBuilder.) ;; topo5 has 40 small tasks, it should be able to exactly use up both the cpu and mem in teh cluster
- _ (doto (.setSpout builder5 "spout5" (TestWordSpout.) 40)
- (.setMemoryLoad 100.0 28.0)
- (.setCPULoad 25.0))
- storm-topology5 (.createTopology builder5)
- topology5 (TopologyDetails. "topology5"
- {TOPOLOGY-NAME "topology-name-5"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology5
- 2
- (mk-ed-map [["spout5" 0 40]]))
- epsilon 0.000001
- topologies (Topologies. (to-top-map [topology1 topology2]))]
-
- (testing "Launch topo 1-3 together, it should be able to use up either mem or cpu resource due to exact division"
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
- scheduler (ResourceAwareScheduler.)
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies cluster)
- super->mem-usage (get-super->mem-usage cluster topologies)
- super->cpu-usage (get-super->cpu-usage cluster topologies)]
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology3")))
- (doseq [super (.values supers)]
- (let [mem-avail (.getTotalMemory super)
- mem-used (.get super->mem-usage super)
- cpu-avail (.getTotalCPU super)
- cpu-used (.get super->cpu-usage super)]
- (is (or (<= (Math/abs (- mem-avail mem-used)) epsilon)
- (<= (Math/abs (- cpu-avail cpu-used)) epsilon)))))))
-
- (testing "Launch topo 1, 2 and 4, they together request a little more mem than available, so one of the 3 topos will not be scheduled"
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
- scheduler (ResourceAwareScheduler.)
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies cluster)
- scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")) 1 0)
- scheduled-topos (+ scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")) 1 0))
- scheduled-topos (+ scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology4")) 1 0))]
- (is (= scheduled-topos 2)))) ;; only 2 topos will get (fully) scheduled
-
- (testing "Launch topo5 only, both mem and cpu should be exactly used up"
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- topologies (Topologies. (to-top-map [topology5]))
- scheduler (ResourceAwareScheduler.)
- _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- _ (.schedule scheduler topologies cluster)
- super->mem-usage (get-super->mem-usage cluster topologies)
- super->cpu-usage (get-super->cpu-usage cluster topologies)]
- (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology5")))
- (doseq [super (.values supers)]
- (let [mem-avail (.getTotalMemory super)
- mem-used (.get super->mem-usage super)
- cpu-avail (.getTotalCPU ^SupervisorDetails super)
- cpu-used (.get super->cpu-usage super)]
- (is (and (<= (Math/abs (- mem-avail mem-used)) epsilon)
- (<= (Math/abs (- cpu-avail cpu-used)) epsilon)))))))))
-
-(deftest test-topology-worker-max-heap-size
- (let [supers (gen-supervisors 2 2)]
- (testing "test if RAS will spread executors across mulitple workers based on the set limit for a worker used by the topology")
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- scheduler (ResourceAwareScheduler.)
- builder1 (TopologyBuilder.)
- _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
- storm-topology1 (.createTopology builder1)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userA"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology1
- 1
- (mk-ed-map [["spout1" 0 4]]))
- topologies (Topologies. (to-top-map [topology1]))]
- (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- (.schedule scheduler topologies cluster)
- (is (= (.get (.getStatusMap cluster) "topology1") "Running - Fully Scheduled by DefaultResourceAwareStrategy"))
- (is (= (.getAssignedNumWorkers cluster topology1) 4)))
- (testing "test when no more workers are available due to topology worker max heap size limit but there is memory is still available")
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- scheduler (ResourceAwareScheduler.)
- builder1 (TopologyBuilder.)
- _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
- storm-topology1 (.createTopology builder1)
- topology1 (TopologyDetails. "topology1"
- {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- storm-topology1
- 1
- (mk-ed-map [["spout1" 0 5]]))
- topologies (Topologies. (to-top-map [topology1]))]
- (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
- RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
- (.schedule scheduler topologies cluster)
- ;;spout1 is going to contain 5 executors that needs scheduling. Each of those executors has a memory requirement of 128.0 MB
- ;;The cluster contains 4 free WorkerSlots. For this topolology each worker is limited to a max heap size of 128.0
- ;;Thus, one executor not going to be able to get scheduled thus failing the scheduling of this topology and no executors of this topology will be scheduleded
- (is (= (.size (.getUnassignedExecutors cluster topology1)) 5))
- (is (= (.get (.getStatusMap cluster) "topology1") "Not enough resources to schedule - 0/5 executors scheduled")))
-
- (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
- {STORM-NETWORK-TOPOGRAPHY-PLUGIN
- "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
- cluster (LocalCluster.)
- builder1 (TopologyBuilder.)
- _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
- storm-topology1 (.createTopology builder1)
- conf {TOPOLOGY-NAME "topology-name-1"
- TOPOLOGY-SUBMITTER-USER "userC"
- TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 129.0
- TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
- TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
- TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
- TOPOLOGY-PRIORITY 0
- TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
- topology1 (TopologyDetails. "topology1"
- conf
- storm-topology1
- 1
- (mk-ed-map [["spout1" 0 5]]))
- topologies (Topologies. (to-top-map [topology1]))]
- (is (thrown? IllegalArgumentException
- (StormSubmitter/submitTopologyWithProgressBar "test" conf storm-topology1)))
-
- )))
[7/8] storm git commit: Minor
Posted by zh...@apache.org.
Minor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3c79096f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3c79096f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3c79096f
Branch: refs/heads/master
Commit: 3c79096f6295559ee649e88ff3569a6fae0b5723
Parents: 91f8a45
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Mar 25 16:38:53 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Mar 25 16:38:53 2016 -0500
----------------------------------------------------------------------
.../scheduler/resource/TestResourceAwareScheduler.java | 9 ++-------
1 file changed, 2 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3c79096f/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 720f2d6..d81c176 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -432,12 +432,7 @@ public class TestResourceAwareScheduler {
// pick a worker to mock as failed
WorkerSlot failedWorker = new ArrayList<WorkerSlot>(assignment.getSlots()).get(0);
Map<ExecutorDetails, WorkerSlot> executorToSlot = assignment.getExecutorToSlot();
- List<ExecutorDetails> failedExecutors = new ArrayList<>();
- for (Map.Entry<ExecutorDetails, WorkerSlot> entry : executorToSlot.entrySet()) {
- if (entry.getValue().equals(failedWorker)) {
- failedExecutors.add(entry.getKey());
- }
- }
+ Collection<ExecutorDetails> failedExecutors = assignment.getSlotToExecutors().get(failedWorker);
for (ExecutorDetails executor : failedExecutors) {
executorToSlot.remove(executor); // remove executor details assigned to the failed worker
}
@@ -527,7 +522,7 @@ public class TestResourceAwareScheduler {
topologies = new Topologies(topoMap);
rs.schedule(topologies, cluster1);
- newAssignment = (SchedulerAssignmentImpl)cluster1.getAssignmentById(topology1.getId());
+ newAssignment = cluster1.getAssignmentById(topology1.getId());
newExecutorToSlot = newAssignment.getExecutorToSlot();
for (ExecutorDetails executor : copyOfOldMapping.keySet()) {
[4/8] storm git commit: Merge two functions
Posted by zh...@apache.org.
Merge two functions
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/58f1161c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/58f1161c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/58f1161c
Branch: refs/heads/master
Commit: 58f1161cb077f90fb64e8a68f9da9c9aedf4f7dd
Parents: 6340601
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Mar 24 17:31:13 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Mar 24 17:31:13 2016 -0500
----------------------------------------------------------------------
.../resource/TestResourceAwareScheduler.java | 10 +++--
.../TestUtilsForResourceAwareScheduler.java | 44 ++++----------------
2 files changed, 13 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/58f1161c/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 28fd491..9cfdc6e 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -616,8 +616,9 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology3.getId()));
- Map<SupervisorDetails, Double> superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
- Map<SupervisorDetails, Double> superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
+ Map<SupervisorDetails, Double> superToCpu = new HashMap<>();
+ Map<SupervisorDetails, Double> superToMem = new HashMap<>();
+ TestUtilsForResourceAwareScheduler.getSupervisorToResourceUsage(cluster, topologies, superToCpu, superToMem);
final Double EPSILON = 0.0001;
for (SupervisorDetails supervisor : supMap.values()) {
@@ -658,8 +659,9 @@ public class TestResourceAwareScheduler {
topologies = new Topologies(topoMap);
rs.prepare(config1);
rs.schedule(topologies, cluster);
- superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
- superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
+ superToCpu = new HashMap<>();
+ superToMem = new HashMap<>();
+ TestUtilsForResourceAwareScheduler.getSupervisorToResourceUsage(cluster, topologies, superToCpu, superToMem);
for (SupervisorDetails supervisor : supMap.values()) {
Double cpuAvailable = supervisor.getTotalCPU();
Double memAvailable = supervisor.getTotalMemory();
http://git-wip-us.apache.org/repos/asf/storm/blob/58f1161c/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 7cd21ce..8b7f9c8 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -289,46 +289,14 @@ public class TestUtilsForResourceAwareScheduler {
return ret;
}
- public static Map<SupervisorDetails, Double> getSupervisorToMemoryUsage(Cluster cluster, Topologies topologies) {
- Map<SupervisorDetails, Double> superToMem = new HashMap<>();
- Collection<SchedulerAssignment> assignments = cluster.getAssignments().values();
- Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
- for (SupervisorDetails supervisor : supervisors) {
- superToMem.put(supervisor, 0.0);
- }
-
- for (SchedulerAssignment assignment : assignments) {
- Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>();
- Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>();
- TopologyDetails topology = topologies.getById(assignment.getTopologyId());
- for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment.getExecutorToSlot().entrySet()) {
- executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId()));
- }
- for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) {
- List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue());
- if (executorsOnSupervisor == null) {
- executorsOnSupervisor = new ArrayList<>();
- supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor);
- }
- executorsOnSupervisor.add(entry.getKey());
- }
- for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
- Double supervisorUsedMemory = 0.0;
- for (ExecutorDetails executor: entry.getValue()) {
- supervisorUsedMemory += topology.getTotalMemReqTask(executor);
- }
- superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory);
- }
- }
- return superToMem;
- }
-
- public static Map<SupervisorDetails, Double> getSupervisorToCpuUsage(Cluster cluster, Topologies topologies) {
- Map<SupervisorDetails, Double> superToCpu = new HashMap<>();
+ public static void getSupervisorToResourceUsage(Cluster cluster, Topologies topologies,
+ Map<SupervisorDetails, Double> superToCpu,
+ Map<SupervisorDetails, Double> superToMem) {
Collection<SchedulerAssignment> assignments = cluster.getAssignments().values();
Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
for (SupervisorDetails supervisor : supervisors) {
superToCpu.put(supervisor, 0.0);
+ superToMem.put(supervisor, 0.0);
}
for (SchedulerAssignment assignment : assignments) {
@@ -348,12 +316,14 @@ public class TestUtilsForResourceAwareScheduler {
}
for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
Double supervisorUsedCpu = 0.0;
+ Double supervisorUsedMemory = 0.0;
for (ExecutorDetails executor: entry.getValue()) {
supervisorUsedCpu += topology.getTotalCpuReqTask(executor);
+ supervisorUsedMemory += topology.getTotalMemReqTask(executor);
}
superToCpu.put(entry.getKey(), superToCpu.get(entry.getKey()) + supervisorUsedCpu);
+ superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory);
}
}
- return superToCpu;
}
}
[6/8] storm git commit: Modify comment
Posted by zh...@apache.org.
Modify comment
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/91f8a456
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/91f8a456
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/91f8a456
Branch: refs/heads/master
Commit: 91f8a456d99f92aabb50898f6935a7b44015bf40
Parents: 13e8b11
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Mar 25 15:37:41 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Mar 25 15:37:41 2016 -0500
----------------------------------------------------------------------
.../storm/scheduler/resource/TestResourceAwareScheduler.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/91f8a456/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 5ae1432..720f2d6 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -308,7 +308,7 @@ public class TestResourceAwareScheduler {
resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
- TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
+ TopologyBuilder builder1 = new TopologyBuilder();
builder1.setSpout("wordSpout", new TestWordSpout(), 2).setCPULoad(250.0).setMemoryLoad(1000.0, 200.0);
builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(100.0).setMemoryLoad(500.0, 100.0);
StormTopology stormTopology1 = builder1.createTopology();
[2/8] storm git commit: Minor
Posted by zh...@apache.org.
Minor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7a302e3b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7a302e3b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7a302e3b
Branch: refs/heads/master
Commit: 7a302e3bc5b6652642ed5fb9ff6f4fed8607680f
Parents: c1b93de
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Mar 18 16:11:50 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Mar 18 16:11:50 2016 -0500
----------------------------------------------------------------------
storm-core/src/jvm/org/apache/storm/Config.java | 2 +-
.../jvm/org/apache/storm/utils/ConfigUtils.java | 20 ++++++++++----------
.../resource/TestResourceAwareScheduler.java | 1 -
3 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7a302e3b/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 6ea8b0f..05030e8 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -232,7 +232,7 @@ public class Config extends HashMap<String, Object> {
/**
* Whether we want to display all the resource capacity and scheduled usage on the UI page.
- * We suggest to have this variable set if you are using any kind of resource-related scheduler.
+ * You MUST have this variable set if you are using any kind of resource-related scheduler.
*
* If this is not set, we will not display resource capacity and usage on the UI.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/7a302e3b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index c6543d4..ed3d305 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -129,7 +129,7 @@ public class ConfigUtils {
// public static mkStatsSampler // depends on Utils.evenSampler() TODO, this is sth we need to do after util
- // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+ // we use this "weird" wrapper pattern temporarily for mocking in clojure test
public static Map readStormConfig() {
return _instance.readStormConfigImpl();
}
@@ -235,7 +235,7 @@ public class ConfigUtils {
return (masterLocalDir(conf) + FILE_SEPARATOR + "inimbus");
}
- // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+ // we use this "weird" wrapper pattern temporarily for mocking in clojure test
public static String supervisorLocalDir(Map conf) throws IOException {
return _instance.supervisorLocalDirImpl(conf);
}
@@ -250,7 +250,7 @@ public class ConfigUtils {
return (supervisorLocalDir(conf) + FILE_SEPARATOR + "isupervisor");
}
- // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+ // we use this "weird" wrapper pattern temporarily for mocking in clojure test
public static String supervisorStormDistRoot(Map conf) throws IOException {
return _instance.supervisorStormDistRootImpl(conf);
}
@@ -259,7 +259,7 @@ public class ConfigUtils {
return stormDistPath(supervisorLocalDir(conf));
}
- // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+ // we use this "weird" wrapper pattern temporarily for mocking in clojure test
public static String supervisorStormDistRoot(Map conf, String stormId) throws IOException {
return _instance.supervisorStormDistRootImpl(conf, stormId);
}
@@ -299,7 +299,7 @@ public class ConfigUtils {
return (concatIfNotNull(stormRoot) + FILE_SEPARATOR + RESOURCES_SUBDIR);
}
- // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+ // we use this "weird" wrapper pattern temporarily for mocking in clojure test
public static LocalState supervisorState(Map conf) throws IOException {
return _instance.supervisorStateImpl(conf);
}
@@ -308,7 +308,7 @@ public class ConfigUtils {
return new LocalState((supervisorLocalDir(conf) + FILE_SEPARATOR + "localstate"));
}
- // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+ // we use this "weird" wrapper pattern temporarily for mocking in clojure test
public static LocalState nimbusTopoHistoryState(Map conf) throws IOException {
return _instance.nimbusTopoHistoryStateImpl(conf);
}
@@ -317,7 +317,7 @@ public class ConfigUtils {
return new LocalState((masterLocalDir(conf) + FILE_SEPARATOR + "history"));
}
- // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+ // we use this "weird" wrapper pattern temporarily for mocking in clojure test
public static Map readSupervisorStormConf(Map conf, String stormId) throws IOException {
return _instance.readSupervisorStormConfImpl(conf, stormId);
}
@@ -380,7 +380,7 @@ public class ConfigUtils {
return ret;
}
- // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+ // we use this "weird" wrapper pattern temporarily for mocking in clojure test
public static void setWorkerUserWSE(Map conf, String workerId, String user) throws IOException {
_instance.setWorkerUserWSEImpl(conf, workerId, user);
}
@@ -401,7 +401,7 @@ public class ConfigUtils {
new File(workerUserFile(conf, workerId)).delete();
}
- // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+ // we use this "weird" wrapper pattern temporarily for mocking in clojure test
public static String workerArtifactsRoot(Map conf) {
return _instance.workerArtifactsRootImpl(conf);
}
@@ -447,7 +447,7 @@ public class ConfigUtils {
return new File((logRoot + FILE_SEPARATOR + id + FILE_SEPARATOR + port));
}
- // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+ // we use this "weird" wrapper pattern temporarily for mocking in clojure test
public static String workerRoot(Map conf) {
return _instance.workerRootImpl(conf);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7a302e3b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index e0336ea..28fd491 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -63,7 +63,6 @@ public class TestResourceAwareScheduler {
private static final Config defaultTopologyConf = new Config();
-
@BeforeClass
public static void initConf() {
defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
[8/8] storm git commit: Merge branch '1300' of
https://github.com/zhuoliu/storm into STORM-1300
Posted by zh...@apache.org.
Merge branch '1300' of https://github.com/zhuoliu/storm into STORM-1300
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/da7969ea
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/da7969ea
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/da7969ea
Branch: refs/heads/master
Commit: da7969ea946ca9baa633a59454cc9ab0de4542db
Parents: 89a349e 3c79096
Author: zhuol <zh...@yahoo-inc.com>
Authored: Mon Mar 28 14:06:31 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Mon Mar 28 14:06:31 2016 -0500
----------------------------------------------------------------------
storm-core/src/jvm/org/apache/storm/Config.java | 2 +-
.../jvm/org/apache/storm/utils/ConfigUtils.java | 20 +-
.../scheduler/resource_aware_scheduler_test.clj | 738 -------------------
.../resource/TestResourceAwareScheduler.java | 680 ++++++++++++++++-
.../TestUtilsForResourceAwareScheduler.java | 43 +-
5 files changed, 732 insertions(+), 751 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/da7969ea/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/da7969ea/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------