You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by zh...@apache.org on 2016/03/28 21:07:21 UTC

[1/8] storm git commit: [STORM-1300] port backtype.storm.scheduler.resource-aware-scheduler-test to java.

Repository: storm
Updated Branches:
  refs/heads/master 89a349eb8 -> da7969ea9


[STORM-1300] port backtype.storm.scheduler.resource-aware-scheduler-test to java.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1b93de1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1b93de1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1b93de1

Branch: refs/heads/master
Commit: c1b93de1650d113df0e1d0493780d9915bf3dacc
Parents: c2cf3be
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Mar 18 16:06:00 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Mar 18 16:06:00 2016 -0500

----------------------------------------------------------------------
 .../resource/TestResourceAwareScheduler.java    | 683 ++++++++++++++++++-
 .../TestUtilsForResourceAwareScheduler.java     |  73 +-
 2 files changed, 754 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c1b93de1/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 78c73a1..e0336ea 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.storm.scheduler.resource;
 
 import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
@@ -29,11 +30,14 @@ import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.validation.ConfigValidation;
 
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +48,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Collection;
+import java.util.Collections;
 
 public class TestResourceAwareScheduler {
 
@@ -54,6 +61,680 @@ public class TestResourceAwareScheduler {
 
     private static int currentTime = 1450418597;
 
+    private static final Config defaultTopologyConf = new Config();
+
+
+    @BeforeClass
+    public static void initConf() {
+        defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
+        defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+
+        defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
+        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0);
+        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0);
+        defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0);
+        defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
+        defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
+    }
+
+    @Test
+    public void testRASNodeSlotAssign() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
+        Topologies topologies = new Topologies(new HashMap<String, TopologyDetails>());
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), new HashMap());
+        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
+        Assert.assertEquals(5, nodes.size());
+        RAS_Node node = nodes.get("sup-0");
+
+        Assert.assertEquals("sup-0", node.getId());
+        Assert.assertTrue(node.isAlive());
+        Assert.assertEquals(0, node.getRunningTopologies().size());
+        Assert.assertTrue(node.isTotallyFree());
+        Assert.assertEquals(4, node.totalSlotsFree());
+        Assert.assertEquals(0, node.totalSlotsUsed());
+        Assert.assertEquals(4, node.totalSlots());
+
+        TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0);
+
+        List<ExecutorDetails> executors11 = new ArrayList<>();
+        executors11.add(new ExecutorDetails(1, 1));
+        node.assign(node.getFreeSlots().iterator().next(), topology1, executors11);
+        Assert.assertEquals(1, node.getRunningTopologies().size());
+        Assert.assertFalse(node.isTotallyFree());
+        Assert.assertEquals(3, node.totalSlotsFree());
+        Assert.assertEquals(1, node.totalSlotsUsed());
+        Assert.assertEquals(4, node.totalSlots());
+
+        List<ExecutorDetails> executors12 = new ArrayList<>();
+        executors12.add(new ExecutorDetails(2, 2));
+        node.assign(node.getFreeSlots().iterator().next(), topology1, executors12);
+        Assert.assertEquals(1, node.getRunningTopologies().size());
+        Assert.assertFalse(node.isTotallyFree());
+        Assert.assertEquals(2, node.totalSlotsFree());
+        Assert.assertEquals(2, node.totalSlotsUsed());
+        Assert.assertEquals(4, node.totalSlots());
+
+        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0);
+
+        List<ExecutorDetails> executors21 = new ArrayList<>();
+        executors21.add(new ExecutorDetails(1, 1));
+        node.assign(node.getFreeSlots().iterator().next(), topology2, executors21);
+        Assert.assertEquals(2, node.getRunningTopologies().size());
+        Assert.assertFalse(node.isTotallyFree());
+        Assert.assertEquals(1, node.totalSlotsFree());
+        Assert.assertEquals(3, node.totalSlotsUsed());
+        Assert.assertEquals(4, node.totalSlots());
+
+        List<ExecutorDetails> executors22 = new ArrayList<>();
+        executors22.add(new ExecutorDetails(2, 2));
+        node.assign(node.getFreeSlots().iterator().next(), topology2, executors22);
+        Assert.assertEquals(2, node.getRunningTopologies().size());
+        Assert.assertFalse(node.isTotallyFree());
+        Assert.assertEquals(0, node.totalSlotsFree());
+        Assert.assertEquals(4, node.totalSlotsUsed());
+        Assert.assertEquals(4, node.totalSlots());
+
+        node.freeAllSlots();
+        Assert.assertEquals(0, node.getRunningTopologies().size());
+        Assert.assertTrue(node.isTotallyFree());
+        Assert.assertEquals(4, node.totalSlotsFree());
+        Assert.assertEquals(0, node.totalSlotsUsed());
+        Assert.assertEquals(4, node.totalSlots());
+    }
+
+    @Test
+    public void sanityTestOfScheduling() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, resourceMap);
+
+        Config config = new Config();
+        config.putAll(defaultTopologyConf);
+
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 0, 0);
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        Topologies topologies = new Topologies(topoMap);
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        SchedulerAssignment assignment = cluster.getAssignmentById(topology1.getId());
+        Set<WorkerSlot> assignedSlots = assignment.getSlots();
+        Set<String> nodesIDs = new HashSet<>();
+        for (WorkerSlot slot : assignedSlots) {
+            nodesIDs.add(slot.getNodeId());
+        }
+        Collection<ExecutorDetails> executors = assignment.getExecutors();
+
+        Assert.assertEquals(1, assignedSlots.size());
+        Assert.assertEquals(1, nodesIDs.size());
+        Assert.assertEquals(2, executors.size());
+        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+    }
+
+    @Test
+    public void testTopologyWithMultipleSpouts() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, resourceMap);
+
+        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
+        builder1.setSpout("wordSpout1", new TestWordSpout(), 1);
+        builder1.setSpout("wordSpout2", new TestWordSpout(), 1);
+        builder1.setBolt("wordCountBolt1", new TestWordCounter(), 1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
+        builder1.setBolt("wordCountBolt2", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
+        builder1.setBolt("wordCountBolt3", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
+        builder1.setBolt("wordCountBolt4", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt2");
+        builder1.setBolt("wordCountBolt5", new TestWordCounter(), 1).shuffleGrouping("wordSpout2");
+        StormTopology stormTopology1 = builder1.createTopology();
+
+        Config config = new Config();
+        config.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0);
+
+        TopologyBuilder builder2 = new TopologyBuilder(); // a topology with two unconnected partitions
+        builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
+        builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
+        StormTopology stormTopology2 = builder2.createTopology();
+        Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 1, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config, stormTopology2, 0, executorMap2, 0);
+
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        topoMap.put(topology2.getId(), topology2);
+        Topologies topologies = new Topologies(topoMap);
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
+        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
+        Set<String> nodesIDs1 = new HashSet<>();
+        for (WorkerSlot slot : assignedSlots1) {
+            nodesIDs1.add(slot.getNodeId());
+        }
+        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
+
+        Assert.assertEquals(1, assignedSlots1.size());
+        Assert.assertEquals(1, nodesIDs1.size());
+        Assert.assertEquals(7, executors1.size());
+        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+
+        SchedulerAssignment assignment2 = cluster.getAssignmentById(topology2.getId());
+        Set<WorkerSlot> assignedSlots2 = assignment2.getSlots();
+        Set<String> nodesIDs2 = new HashSet<>();
+        for (WorkerSlot slot : assignedSlots2) {
+            nodesIDs2.add(slot.getNodeId());
+        }
+        Collection<ExecutorDetails> executors2 = assignment2.getExecutors();
+
+        Assert.assertEquals(1, assignedSlots2.size());
+        Assert.assertEquals(1, nodesIDs2.size());
+        Assert.assertEquals(2, executors2.size());
+        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
+    }
+
+    @Test
+    public void testTopologySetCpuAndMemLoad() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
+        builder1.setSpout("wordSpout", new TestWordSpout(), 1).setCPULoad(20.0).setMemoryLoad(200.0);
+        builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(20.0).setMemoryLoad(200.0);
+        StormTopology stormTopology1 = builder1.createTopology();
+
+        Config config = new Config();
+        config.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0);
+
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        Topologies topologies = new Topologies(topoMap);
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
+        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
+        double assignedMemory = 0.0;
+        double assignedCpu = 0.0;
+        Set<String> nodesIDs1 = new HashSet<>();
+        for (WorkerSlot slot : assignedSlots1) {
+            nodesIDs1.add(slot.getNodeId());
+            assignedMemory += slot.getAllocatedMemOnHeap() + slot.getAllocatedMemOffHeap();
+            assignedCpu += slot.getAllocatedCpu();
+
+        }
+        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
+
+        Assert.assertEquals(1, assignedSlots1.size());
+        Assert.assertEquals(1, nodesIDs1.size());
+        Assert.assertEquals(2, executors1.size());
+        Assert.assertEquals(400.0, assignedMemory, 0.001);
+        Assert.assertEquals(40.0, assignedCpu, 0.001);
+        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+    }
+
+    @Test
+    public void testResourceLimitation() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
+        builder1.setSpout("wordSpout", new TestWordSpout(), 2).setCPULoad(250.0).setMemoryLoad(1000.0, 200.0);
+        builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(100.0).setMemoryLoad(500.0, 100.0);
+        StormTopology stormTopology1 = builder1.createTopology();
+
+        Config config = new Config();
+        config.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 1);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 2, executorMap1, 0);
+
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        Topologies topologies = new Topologies(topoMap);
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
+        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
+        Set<String> nodesIDs1 = new HashSet<>();
+        for (WorkerSlot slot : assignedSlots1) {
+            nodesIDs1.add(slot.getNodeId());
+        }
+        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
+        List<Double> assignedExecutorMemory = new ArrayList<>();
+        List<Double> assignedExecutorCpu = new ArrayList<>();
+        for (ExecutorDetails executor : executors1) {
+            assignedExecutorMemory.add(topology1.getTotalMemReqTask(executor));
+            assignedExecutorCpu.add(topology1.getTotalCpuReqTask(executor));
+        }
+        Collections.sort(assignedExecutorCpu);
+        Collections.sort(assignedExecutorMemory);
+
+        Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>();
+        Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>();
+        Map<Double, Double> cpuAvailableToUsed = new HashMap();
+        Map<Double, Double> memoryAvailableToUsed = new HashMap();
+
+        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment1.getExecutorToSlot().entrySet()) {
+            executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId()));
+        }
+        for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) {
+            List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue());
+            if (executorsOnSupervisor == null) {
+                executorsOnSupervisor = new ArrayList<>();
+                supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor);
+            }
+            executorsOnSupervisor.add(entry.getKey());
+        }
+        for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
+            Double supervisorTotalCpu = entry.getKey().getTotalCPU();
+            Double supervisorTotalMemory = entry.getKey().getTotalMemory();
+            Double supervisorUsedCpu = 0.0;
+            Double supervisorUsedMemory = 0.0;
+            for (ExecutorDetails executor: entry.getValue()) {
+                supervisorUsedMemory += topology1.getTotalCpuReqTask(executor);
+                supervisorTotalCpu += topology1.getTotalMemReqTask(executor);
+            }
+            cpuAvailableToUsed.put(supervisorTotalCpu, supervisorUsedCpu);
+            memoryAvailableToUsed.put(supervisorTotalMemory, supervisorUsedMemory);
+        }
+        // executor0 resides one one worker (on one), executor1 and executor2 on another worker (on the other node)
+        Assert.assertEquals(2, assignedSlots1.size());
+        Assert.assertEquals(2, nodesIDs1.size());
+        Assert.assertEquals(3, executors1.size());
+
+        Assert.assertEquals(100.0, assignedExecutorCpu.get(0), 0.001);
+        Assert.assertEquals(250.0, assignedExecutorCpu.get(1), 0.001);
+        Assert.assertEquals(250.0, assignedExecutorCpu.get(2), 0.001);
+        Assert.assertEquals(600.0, assignedExecutorMemory.get(0), 0.001);
+        Assert.assertEquals(1200.0, assignedExecutorMemory.get(1), 0.001);
+        Assert.assertEquals(1200.0, assignedExecutorMemory.get(2), 0.001);
+
+        for (Map.Entry<Double, Double> entry : memoryAvailableToUsed.entrySet()) {
+            Assert.assertTrue(entry.getKey()- entry.getValue() >= 0);
+        }
+        for (Map.Entry<Double, Double> entry : cpuAvailableToUsed.entrySet()) {
+            Assert.assertTrue(entry.getKey()- entry.getValue() >= 0);
+        }
+        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+    }
+
+    @Test
+    public void testScheduleResilience() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+        TopologyBuilder builder1 = new TopologyBuilder();
+        builder1.setSpout("wordSpout1", new TestWordSpout(), 3);
+        StormTopology stormTopology1 = builder1.createTopology();
+        Config config1 = new Config();
+        config1.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 3, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 3, executorMap1, 0);
+
+        TopologyBuilder builder2 = new TopologyBuilder();
+        builder2.setSpout("wordSpout2", new TestWordSpout(), 2);
+        StormTopology stormTopology2 = builder2.createTopology();
+        Config config2 = new Config();
+        config2.putAll(defaultTopologyConf);
+        // memory requirement is large enough so that two executors can not be fully assigned to one node
+        config2.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 1280.0);
+        Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 2, executorMap2, 0);
+
+        // Test1: When a worker fails, RAS does not alter existing assignments on healthy workers
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology2.getId(), topology2);
+        Topologies topologies = new Topologies(topoMap);
+
+        rs.prepare(config1);
+        rs.schedule(topologies, cluster);
+
+        SchedulerAssignmentImpl assignment = (SchedulerAssignmentImpl)cluster.getAssignmentById(topology2.getId());
+        // pick a worker to mock as failed
+        WorkerSlot failedWorker = new ArrayList<WorkerSlot>(assignment.getSlots()).get(0);
+        Map<ExecutorDetails, WorkerSlot> executorToSlot = assignment.getExecutorToSlot();
+        List<ExecutorDetails> failedExecutors = new ArrayList<>();
+        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : executorToSlot.entrySet()) {
+            if (entry.getValue().equals(failedWorker)) {
+                failedExecutors.add(entry.getKey());
+            }
+        }
+        for (ExecutorDetails executor : failedExecutors) {
+            executorToSlot.remove(executor); // remove executor details assigned to the failed worker
+        }
+        Map<ExecutorDetails, WorkerSlot> copyOfOldMapping = new HashMap<>(executorToSlot);
+        Set<ExecutorDetails> healthyExecutors = copyOfOldMapping.keySet();
+
+        rs.schedule(topologies, cluster);
+        SchedulerAssignment newAssignment = cluster.getAssignmentById(topology2.getId());
+        Map<ExecutorDetails, WorkerSlot> newExecutorToSlot = newAssignment.getExecutorToSlot();
+
+        for (ExecutorDetails executor : healthyExecutors) {
+            Assert.assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
+        }
+        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
+        // end of Test1
+
+        // Test2: When a supervisor fails, RAS does not alter existing assignments
+        executorToSlot = new HashMap<>();
+        executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 0));
+        executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 1));
+        executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 1));
+        Map<String, SchedulerAssignmentImpl> existingAssignments = new HashMap<>();
+        assignment = new SchedulerAssignmentImpl(topology1.getId(), executorToSlot);
+        existingAssignments.put(topology1.getId(), assignment);
+        copyOfOldMapping = new HashMap<>(executorToSlot);
+        Set<ExecutorDetails> existingExecutors = copyOfOldMapping.keySet();
+        Map<String, SupervisorDetails> supMap1 = new HashMap<>(supMap);
+        supMap1.remove("sup-0"); // mock the supervisor sup-0 as a failed supervisor
+        Cluster cluster1 = new Cluster(iNimbus, supMap1, existingAssignments, config1);
+
+        topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster1);
+
+        newAssignment = cluster1.getAssignmentById(topology1.getId());
+        newExecutorToSlot = newAssignment.getExecutorToSlot();
+
+        for (ExecutorDetails executor : existingExecutors) {
+            Assert.assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
+        }
+        Assert.assertEquals("Fully Scheduled", cluster1.getStatusMap().get(topology1.getId()));
+        // end of Test2
+
+        // Test3: When a supervisor and a worker on it fails, RAS does not alter existing assignments
+        executorToSlot = new HashMap<>();
+        executorToSlot.put(new ExecutorDetails(0, 0), new WorkerSlot("sup-0", 1)); // the worker to orphan
+        executorToSlot.put(new ExecutorDetails(1, 1), new WorkerSlot("sup-0", 2)); // the worker that fails
+        executorToSlot.put(new ExecutorDetails(2, 2), new WorkerSlot("sup-1", 1)); // the healthy worker
+        existingAssignments = new HashMap<>();
+        assignment = new SchedulerAssignmentImpl(topology1.getId(), executorToSlot);
+        existingAssignments.put(topology1.getId(), assignment);
+        // delete one worker of sup-0 (failed) from topo1 assignment to enable actual schedule for testing
+        executorToSlot.remove(new ExecutorDetails(1, 1));
+
+        copyOfOldMapping = new HashMap<>(executorToSlot);
+        existingExecutors = copyOfOldMapping.keySet(); // namely the two eds on the orphaned worker and the healthy worker
+        supMap1 = new HashMap<>(supMap);
+        supMap1.remove("sup-0"); // mock the supervisor sup-0 as a failed supervisor
+        cluster1 = new Cluster(iNimbus, supMap1, existingAssignments, config1);
+
+        topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster1);
+
+        newAssignment = cluster1.getAssignmentById(topology1.getId());
+        newExecutorToSlot = newAssignment.getExecutorToSlot();
+
+        for (ExecutorDetails executor : existingExecutors) {
+            Assert.assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
+        }
+        Assert.assertEquals("Fully Scheduled", cluster1.getStatusMap().get(topology1.getId()));
+        // end of Test3
+
+        // Test4: Scheduling a new topology does not disturb other assignments unnecessarily
+        cluster1 = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1);
+        topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster1);
+        assignment = (SchedulerAssignmentImpl)cluster1.getAssignmentById(topology1.getId());
+        executorToSlot = assignment.getExecutorToSlot();
+        copyOfOldMapping = new HashMap<>(executorToSlot);
+
+        topoMap.put(topology2.getId(), topology2);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster1);
+
+        newAssignment = (SchedulerAssignmentImpl)cluster1.getAssignmentById(topology1.getId());
+        newExecutorToSlot = newAssignment.getExecutorToSlot();
+
+        for (ExecutorDetails executor : copyOfOldMapping.keySet()) {
+            Assert.assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
+        }
+        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster1.getStatusMap().get(topology1.getId()));
+        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster1.getStatusMap().get(topology2.getId()));
+    }
+
+    @Test
+    public void testHeterogeneousCluster() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap1 = new HashMap<>(); // strong supervisor node
+        resourceMap1.put(Config.SUPERVISOR_CPU_CAPACITY, 800.0);
+        resourceMap1.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 4096.0);
+        Map<String, Number> resourceMap2 = new HashMap<>(); // weak supervisor node
+        resourceMap2.put(Config.SUPERVISOR_CPU_CAPACITY, 200.0);
+        resourceMap2.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1024.0);
+
+        Map<String, SupervisorDetails> supMap = new HashMap<String, SupervisorDetails>();
+        for (int i = 0; i < 2; i++) {
+            List<Number> ports = new LinkedList<Number>();
+            for (int j = 0; j < 4; j++) {
+                ports.add(j);
+            }
+            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, (Map)(i == 0 ? resourceMap1 : resourceMap2));
+            supMap.put(sup.getId(), sup);
+        }
+
+        // topo1 has one single huge task that can not be handled by the small-super
+        TopologyBuilder builder1 = new TopologyBuilder();
+        builder1.setSpout("wordSpout1", new TestWordSpout(), 1).setCPULoad(300.0).setMemoryLoad(2000.0, 48.0);
+        StormTopology stormTopology1 = builder1.createTopology();
+        Config config1 = new Config();
+        config1.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0);
+
+        // topo2 has 4 large tasks
+        TopologyBuilder builder2 = new TopologyBuilder();
+        builder2.setSpout("wordSpout2", new TestWordSpout(), 4).setCPULoad(100.0).setMemoryLoad(500.0, 12.0);
+        StormTopology stormTopology2 = builder2.createTopology();
+        Config config2 = new Config();
+        config2.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 4, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0);
+
+        // topo3 has 4 large tasks
+        TopologyBuilder builder3 = new TopologyBuilder();
+        builder3.setSpout("wordSpout3", new TestWordSpout(), 4).setCPULoad(20.0).setMemoryLoad(200.0, 56.0);
+        StormTopology stormTopology3 = builder3.createTopology();
+        Config config3 = new Config();
+        config3.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap3 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology3, 4, 0);
+        TopologyDetails topology3 = new TopologyDetails("topology3", config2, stormTopology3, 1, executorMap3, 0);
+
+        // topo4 has 12 small tasks, whose mem usage does not exactly divide a node's mem capacity
+        TopologyBuilder builder4 = new TopologyBuilder();
+        builder4.setSpout("wordSpout4", new TestWordSpout(), 12).setCPULoad(30.0).setMemoryLoad(100.0, 0.0);
+        StormTopology stormTopology4 = builder4.createTopology();
+        Config config4 = new Config();
+        config4.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap4 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology4, 12, 0);
+        TopologyDetails topology4 = new TopologyDetails("topology4", config4, stormTopology4, 1, executorMap4, 0);
+
+        // topo5 has 40 small tasks, it should be able to exactly use up both the cpu and mem in the cluster
+        TopologyBuilder builder5 = new TopologyBuilder();
+        builder5.setSpout("wordSpout5", new TestWordSpout(), 40).setCPULoad(25.0).setMemoryLoad(100.0, 28.0);
+        StormTopology stormTopology5 = builder5.createTopology();
+        Config config5 = new Config();
+        config5.putAll(defaultTopologyConf);
+        Map<ExecutorDetails, String> executorMap5 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology5, 40, 0);
+        TopologyDetails topology5 = new TopologyDetails("topology5", config5, stormTopology5, 1, executorMap5, 0);
+
+        // Test1: Launch topo 1-3 together, it should be able to use up either mem or cpu resource due to exact division
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        topoMap.put(topology2.getId(), topology2);
+        topoMap.put(topology3.getId(), topology3);
+        Topologies topologies = new Topologies(topoMap);
+        rs.prepare(config1);
+        rs.schedule(topologies, cluster);
+
+        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
+        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology3.getId()));
+
+        Map<SupervisorDetails, Double> superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
+        Map<SupervisorDetails, Double> superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
+
+        final Double EPSILON = 0.0001;
+        for (SupervisorDetails supervisor : supMap.values()) {
+            Double cpuAvailable = supervisor.getTotalCPU();
+            Double memAvailable = supervisor.getTotalMemory();
+            Double cpuUsed = superToCpu.get(supervisor);
+            Double memUsed = superToMem.get(supervisor);
+            Assert.assertTrue((Math.abs(memAvailable - memUsed) < EPSILON) || (Math.abs(cpuAvailable - cpuUsed) < EPSILON));
+        }
+        // end of Test1
+
+        // Test2: Launch topo 1, 2 and 4, they together request a little more mem than available, so one of the 3 topos will not be scheduled
+        cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1);
+        topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        topoMap.put(topology2.getId(), topology2);
+        topoMap.put(topology4.getId(), topology4);
+        topologies = new Topologies(topoMap);
+        rs.prepare(config1);
+        rs.schedule(topologies, cluster);
+        int numTopologiesAssigned = 0;
+        if (cluster.getStatusMap().get(topology1.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) {
+            numTopologiesAssigned++;
+        }
+        if (cluster.getStatusMap().get(topology2.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) {
+            numTopologiesAssigned++;
+        }
+        if (cluster.getStatusMap().get(topology4.getId()).equals("Running - Fully Scheduled by DefaultResourceAwareStrategy")) {
+            numTopologiesAssigned++;
+        }
+        Assert.assertEquals(2, numTopologiesAssigned);
+        //end of Test2
+
+        //Test3: "Launch topo5 only, both mem and cpu should be exactly used up"
+        cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1);
+        topoMap = new HashMap<>();
+        topoMap.put(topology5.getId(), topology5);
+        topologies = new Topologies(topoMap);
+        rs.prepare(config1);
+        rs.schedule(topologies, cluster);
+        superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
+        superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
+        for (SupervisorDetails supervisor : supMap.values()) {
+            Double cpuAvailable = supervisor.getTotalCPU();
+            Double memAvailable = supervisor.getTotalMemory();
+            Double cpuUsed = superToCpu.get(supervisor);
+            Double memUsed = superToMem.get(supervisor);
+            Assert.assertEquals(cpuAvailable, cpuUsed, 0.0001);
+            Assert.assertEquals(memAvailable, memUsed, 0.0001);
+        }
+        //end of Test3
+    }
+
+    @Test
+    public void testTopologyWorkerMaxHeapSize() {
+        // Test1: If RAS spreads executors across multiple workers based on the set limit for a worker used by the topology
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
+
+        TopologyBuilder builder1 = new TopologyBuilder();
+        builder1.setSpout("wordSpout1", new TestWordSpout(), 4);
+        StormTopology stormTopology1 = builder1.createTopology();
+        Config config1 = new Config();
+        config1.putAll(defaultTopologyConf);
+        config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
+        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 4, 0);
+        TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 1, executorMap1, 0);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1);
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topology1.getId(), topology1);
+        Topologies topologies = new Topologies(topoMap);
+        rs.prepare(config1);
+        rs.schedule(topologies, cluster);
+        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
+        Assert.assertEquals(4, cluster.getAssignedNumWorkers(topology1));
+
+        // Test2: test when no more workers are available due to topology worker max heap size limit but there is memory is still available
+        // wordSpout2 is going to contain 5 executors that needs scheduling. Each of those executors has a memory requirement of 128.0 MB
+        // The cluster contains 4 free WorkerSlots. For this topolology each worker is limited to a max heap size of 128.0
+        // Thus, one executor not going to be able to get scheduled thus failing the scheduling of this topology and no executors of this topology will be scheduleded
+        TopologyBuilder builder2 = new TopologyBuilder();
+        builder2.setSpout("wordSpout2", new TestWordSpout(), 5);
+        StormTopology stormTopology2 = builder2.createTopology();
+        Config config2 = new Config();
+        config2.putAll(defaultTopologyConf);
+        config2.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
+        Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 5, 0);
+        TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 1, executorMap2, 0);
+        cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config2);
+        topoMap = new HashMap<>();
+        topoMap.put(topology2.getId(), topology2);
+        topologies = new Topologies(topoMap);
+        rs.prepare(config2);
+        rs.schedule(topologies, cluster);
+        Assert.assertEquals("Not enough resources to schedule - 0/5 executors scheduled", cluster.getStatusMap().get(topology2.getId()));
+        Assert.assertEquals(5, cluster.getUnassignedExecutors(topology2).size());
+    }
+
+    @Test(expected=IllegalArgumentException.class)
+    public void testMemoryLoadLargerThanMaxHeapSize() throws Exception {
+        // Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=128.0 < 129.0,
+        // Largest memory requirement of a component in the topology).
+        TopologyBuilder builder1 = new TopologyBuilder();
+        builder1.setSpout("wordSpout1", new TestWordSpout(), 4);
+        StormTopology stormTopology1 = builder1.createTopology();
+        Config config1 = new Config();
+        config1.putAll(defaultTopologyConf);
+        config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
+        config1.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 129.0);
+        StormSubmitter.submitTopologyWithProgressBar("test", config1, stormTopology1);
+    }
+
     @Test
     public void TestReadInResourceAwareSchedulerUserPools() {
         Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);

http://git-wip-us.apache.org/repos/asf/storm/blob/c1b93de1/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index f21645b..7cd21ce 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -29,6 +29,8 @@ import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -50,6 +52,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -109,7 +112,7 @@ public class TestUtilsForResourceAwareScheduler {
     public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology, int spoutParallelism, int boltParallelism) {
         Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String>();
         int startTask = 0;
-        int endTask = 1;
+        int endTask = 0;
         for (Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
             for (int i = 0; i < spoutParallelism; i++) {
                 retMap.put(new ExecutorDetails(startTask, endTask), entry.getKey());
@@ -285,4 +288,72 @@ public class TestUtilsForResourceAwareScheduler {
         }
         return ret;
     }
+
+    public static Map<SupervisorDetails, Double> getSupervisorToMemoryUsage(Cluster cluster, Topologies topologies) {
+        Map<SupervisorDetails, Double> superToMem = new HashMap<>();
+        Collection<SchedulerAssignment> assignments = cluster.getAssignments().values();
+        Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
+        for (SupervisorDetails supervisor : supervisors) {
+            superToMem.put(supervisor, 0.0);
+        }
+
+        for (SchedulerAssignment assignment : assignments) {
+            Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>();
+            Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>();
+            TopologyDetails topology = topologies.getById(assignment.getTopologyId());
+            for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment.getExecutorToSlot().entrySet()) {
+                executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId()));
+            }
+            for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) {
+                List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue());
+                if (executorsOnSupervisor == null) {
+                    executorsOnSupervisor = new ArrayList<>();
+                    supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor);
+                }
+                executorsOnSupervisor.add(entry.getKey());
+            }
+            for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
+                Double supervisorUsedMemory = 0.0;
+                for (ExecutorDetails executor: entry.getValue()) {
+                    supervisorUsedMemory += topology.getTotalMemReqTask(executor);
+                }
+                superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory);
+            }
+        }
+        return superToMem;
+    }
+
+    public static Map<SupervisorDetails, Double> getSupervisorToCpuUsage(Cluster cluster, Topologies topologies) {
+        Map<SupervisorDetails, Double> superToCpu = new HashMap<>();
+        Collection<SchedulerAssignment> assignments = cluster.getAssignments().values();
+        Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
+        for (SupervisorDetails supervisor : supervisors) {
+            superToCpu.put(supervisor, 0.0);
+        }
+
+        for (SchedulerAssignment assignment : assignments) {
+            Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>();
+            Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>();
+            TopologyDetails topology = topologies.getById(assignment.getTopologyId());
+            for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment.getExecutorToSlot().entrySet()) {
+                executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId()));
+            }
+            for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) {
+                List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue());
+                if (executorsOnSupervisor == null) {
+                    executorsOnSupervisor = new ArrayList<>();
+                    supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor);
+                }
+                executorsOnSupervisor.add(entry.getKey());
+            }
+            for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
+                Double supervisorUsedCpu = 0.0;
+                for (ExecutorDetails executor: entry.getValue()) {
+                    supervisorUsedCpu += topology.getTotalCpuReqTask(executor);
+                }
+                superToCpu.put(entry.getKey(), superToCpu.get(entry.getKey()) + supervisorUsedCpu);
+            }
+        }
+        return superToCpu;
+    }
 }


[5/8] storm git commit: Modify comments

Posted by zh...@apache.org.
Modify comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/13e8b11a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/13e8b11a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/13e8b11a

Branch: refs/heads/master
Commit: 13e8b11a580cea2e67760d14b6efef9a943b3faf
Parents: 58f1161
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Mar 25 15:35:54 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Mar 25 15:35:54 2016 -0500

----------------------------------------------------------------------
 .../storm/scheduler/resource/TestResourceAwareScheduler.java      | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/13e8b11a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 9cfdc6e..5ae1432 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -257,9 +257,10 @@ public class TestResourceAwareScheduler {
         Map<String, Number> resourceMap = new HashMap<>();
         resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
         resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
+        // to test whether two tasks will be assigned to one or two nodes
         Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
 
-        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
+        TopologyBuilder builder1 = new TopologyBuilder();
         builder1.setSpout("wordSpout", new TestWordSpout(), 1).setCPULoad(20.0).setMemoryLoad(200.0);
         builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(20.0).setMemoryLoad(200.0);
         StormTopology stormTopology1 = builder1.createTopology();


[3/8] storm git commit: Delete the clj code

Posted by zh...@apache.org.
Delete the clj code


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63406016
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63406016
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63406016

Branch: refs/heads/master
Commit: 6340601684ccd1cbe86d27594f8e32e78ec8a0df
Parents: 7a302e3
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Mar 18 16:13:28 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Mar 18 16:13:28 2016 -0500

----------------------------------------------------------------------
 .../scheduler/resource_aware_scheduler_test.clj | 738 -------------------
 1 file changed, 738 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/63406016/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
deleted file mode 100644
index 4ca0721..0000000
--- a/storm-core/test/clj/org/apache/storm/scheduler/resource_aware_scheduler_test.clj
+++ /dev/null
@@ -1,738 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.scheduler.resource-aware-scheduler-test
-  (:use [clojure test])
-  (:use [org.apache.storm util config testing])
-  (:use [org.apache.storm.internal thrift])
-  (:require [org.apache.storm.util :refer [map-val]])
-  (:require [org.apache.storm.daemon [nimbus :as nimbus]])
-  (:import [org.apache.storm.generated StormTopology]
-           [org.apache.storm Config]
-           [org.apache.storm.testing TestWordSpout TestWordCounter]
-           [org.apache.storm.topology TopologyBuilder]
-           [org.apache.storm.utils Utils])
-  (:import [org.apache.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails
-            SchedulerAssignmentImpl Topologies TopologyDetails])
-  (:import [org.apache.storm.scheduler.resource RAS_Node RAS_Nodes ResourceAwareScheduler])
-  (:import [org.apache.storm Config StormSubmitter])
-  (:import [org.apache.storm LocalDRPC LocalCluster])
-  (:import [java.util HashMap]))
-
-(defn gen-supervisors [count ports]
-  (into {} (for [id (range count)
-                :let [supervisor (SupervisorDetails. (str "id" id)
-                                       (str "host" id)
-                                       (list ) (map int (range ports))
-                                   {Config/SUPERVISOR_MEMORY_CAPACITY_MB 2000.0
-                                    Config/SUPERVISOR_CPU_CAPACITY 400.0})]]
-            {(.getId supervisor) supervisor})))
-
-(defn to-top-map [topologies]
-  (into {} (for [top topologies] {(.getId top) top})))
-
-(defn ed [id] (ExecutorDetails. (int id) (int id)))
-
-(defn mk-ed-map [arg]
-  (into {}
-    (for [[name start end] arg]
-      (into {}
-        (for [at (range start end)]
-          {(ed at) name})))))
-(def DEFAULT_PRIORITY_STRATEGY "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy")
-(def DEFAULT_EVICTION_STRATEGY "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy")
-(def DEFAULT_SCHEDULING_STRATEGY "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy")
-
-;; get the super->mem HashMap by counting the eds' mem usage of all topos on each super
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn get-super->mem-usage [^Cluster cluster ^Topologies topologies]
-  (let [assignments (.values (.getAssignments cluster))
-        supers (.values (.getSupervisors cluster))
-        super->mem-usage (HashMap.)
-        _ (doseq [super supers] 
-             (.put super->mem-usage super 0))]  ;; initialize the mem-usage as 0 for all supers
-    (doseq [assignment assignments]
-      (let [ed->super (into {}
-                            (for [[ed slot] (.getExecutorToSlot assignment)]
-                              {ed (.getSupervisorById cluster (.getNodeId slot))}))
-            super->eds (clojurify-structure (Utils/reverseMap ed->super))
-            topology (.getById topologies (.getTopologyId assignment))
-            super->mem-pertopo (map-val (fn [eds] 
-                                          (reduce + (map #(.getTotalMemReqTask topology %) eds))) 
-                                        super->eds)]  ;; sum up the one topo's eds' mem usage on a super 
-            (doseq [[super mem] super->mem-pertopo]
-              (.put super->mem-usage 
-                    super (+ mem (.get super->mem-usage super)))))) ;; add all topo's mem usage for each super
-    super->mem-usage))
-
-;; get the super->cpu HashMap by counting the eds' cpu usage of all topos on each super
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn get-super->cpu-usage [^Cluster cluster ^Topologies topologies]
-  (let [assignments (.values (.getAssignments cluster))
-        supers (.values (.getSupervisors cluster))
-        super->cpu-usage (HashMap.)
-        _ (doseq [super supers] 
-             (.put super->cpu-usage super 0))] ;; initialize the cpu-usage as 0 for all supers
-    (doseq [assignment assignments]
-      (let [ed->super (into {}
-                            (for [[ed slot] (.getExecutorToSlot assignment)]
-                              {ed (.getSupervisorById cluster (.getNodeId slot))}))
-            super->eds (clojurify-structure (Utils/reverseMap ed->super))
-            topology (.getById topologies (.getTopologyId assignment))
-            super->cpu-pertopo (map-val (fn [eds] 
-                                          (reduce + (map #(.getTotalCpuReqTask topology %) eds))) 
-                                        super->eds)] ;; sum up the one topo's eds' cpu usage on a super 
-            (doseq [[super cpu] super->cpu-pertopo]
-              (.put super->cpu-usage 
-                    super (+ cpu (.get super->cpu-usage super))))))  ;; add all topo's cpu usage for each super
-    super->cpu-usage))
-
-; testing resource/Node class
-(deftest test-node
-  (let [supers (gen-supervisors 5 4)
-        cluster (Cluster. (nimbus/standalone-nimbus) supers {} {})
-        topologies (Topologies. (to-top-map []))
-        node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
-        topology1 (TopologyDetails. "topology1" {} nil 0)
-        topology2 (TopologyDetails. "topology2" {} nil 0)]
-    (is (= 5 (.size node-map)))
-    (let [node (.get node-map "id0")]
-      (is (= "id0" (.getId node)))
-      (is (= true (.isAlive node)))
-      (is (= 0 (.size (.getRunningTopologies node))))
-      (is (= true (.isTotallyFree node)))
-      (is (= 4 (.totalSlotsFree node)))
-      (is (= 0 (.totalSlotsUsed node)))
-      (is (= 4 (.totalSlots node)))
-      (.assign node (.next (.iterator (.getFreeSlots node))) topology1 (list (ExecutorDetails. 1 1)))
-      (is (= 1 (.size (.getRunningTopologies node))))
-      (is (= false (.isTotallyFree node)))
-      (is (= 3 (.totalSlotsFree node)))
-      (is (= 1 (.totalSlotsUsed node)))
-      (is (= 4 (.totalSlots node)))
-      (.assign node (.next (.iterator (.getFreeSlots node))) topology1 (list (ExecutorDetails. 2 2)))
-      (is (= 1 (.size (.getRunningTopologies node))))
-      (is (= false (.isTotallyFree node)))
-      (is (= 2 (.totalSlotsFree node)))
-      (is (= 2 (.totalSlotsUsed node)))
-      (is (= 4 (.totalSlots node)))
-      (.assign node (.next (.iterator (.getFreeSlots node))) topology2 (list (ExecutorDetails. 1 1)))
-      (is (= 2 (.size (.getRunningTopologies node))))
-      (is (= false (.isTotallyFree node)))
-      (is (= 1 (.totalSlotsFree node)))
-      (is (= 3 (.totalSlotsUsed node)))
-      (is (= 4 (.totalSlots node)))
-      (.assign node (.next (.iterator (.getFreeSlots node))) topology2 (list (ExecutorDetails. 2 2)))
-      (is (= 2 (.size (.getRunningTopologies node))))
-      (is (= false (.isTotallyFree node)))
-      (is (= 0 (.totalSlotsFree node)))
-      (is (= 4 (.totalSlotsUsed node)))
-      (is (= 4 (.totalSlots node)))
-      (.freeAllSlots node)
-      (is (= 0 (.size (.getRunningTopologies node))))
-      (is (= true (.isTotallyFree node)))
-      (is (= 4 (.totalSlotsFree node)))
-      (is (= 0 (.totalSlotsUsed node)))
-      (is (= 4 (.totalSlots node)))
-    )))
-
-(deftest test-sanity-resource-aware-scheduler
-  (let [builder (TopologyBuilder.)
-        _ (.setSpout builder "wordSpout" (TestWordSpout.) 1)
-        _ (.shuffleGrouping (.setBolt builder "wordCountBolt" (TestWordCounter.) 1) "wordSpout")
-        supers (gen-supervisors 1 2)
-        storm-topology (.createTopology builder)
-        topology1 (TopologyDetails. "topology1"
-                    {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userC"
-                     TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
-                     TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                     TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0
-                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                    storm-topology
-                    1
-                    (mk-ed-map [["wordSpout" 0 1]
-                                ["wordCountBolt" 1 2]]))
-        cluster (Cluster. (nimbus/standalone-nimbus) supers {}
-                  {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                   "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-        topologies (Topologies. (to-top-map [topology1]))
-        node-map (RAS_Nodes/getAllNodesFrom cluster topologies)
-        scheduler (ResourceAwareScheduler.)]
-    (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                         RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-    (.schedule scheduler topologies cluster)
-    (let [assignment (.getAssignmentById cluster "topology1")
-          assigned-slots (.getSlots assignment)
-          executors (.getExecutors assignment)]
-      (is (= 1 (.size assigned-slots)))
-      (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
-      (is (= 2 (.size executors))))
-    (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))))
-
-(deftest test-topology-with-multiple-spouts
-  (let [builder1 (TopologyBuilder.)  ;; a topology with multiple spouts
-        _ (.setSpout builder1 "wordSpout1" (TestWordSpout.) 1)
-        _ (.setSpout builder1 "wordSpout2" (TestWordSpout.) 1)
-        _ (doto
-            (.setBolt builder1 "wordCountBolt1" (TestWordCounter.) 1)
-            (.shuffleGrouping "wordSpout1")
-            (.shuffleGrouping "wordSpout2"))
-        _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt2" (TestWordCounter.) 1) "wordCountBolt1")
-        _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt3" (TestWordCounter.) 1) "wordCountBolt1")
-        _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt4" (TestWordCounter.) 1) "wordCountBolt2")
-        _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt5" (TestWordCounter.) 1) "wordSpout2")
-        storm-topology1 (.createTopology builder1)
-        topology1 (TopologyDetails. "topology1"
-                    {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userC"
-                     TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
-                     TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                     TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0
-                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                    storm-topology1
-                    1
-                    (mk-ed-map [["wordSpout1" 0 1]
-                                ["wordSpout2" 1 2]
-                                ["wordCountBolt1" 2 3]
-                                ["wordCountBolt2" 3 4]
-                                ["wordCountBolt3" 4 5]
-                                ["wordCountBolt4" 5 6]
-                                ["wordCountBolt5" 6 7]]))
-        builder2 (TopologyBuilder.)  ;; a topology with two unconnected partitions
-        _ (.setSpout builder2 "wordSpoutX" (TestWordSpout.) 1)
-        _ (.setSpout builder2 "wordSpoutY" (TestWordSpout.) 1)
-        storm-topology2 (.createTopology builder1)
-        topology2 (TopologyDetails. "topology2"
-                    {TOPOLOGY-NAME "topology-name-2"
-                     TOPOLOGY-SUBMITTER-USER "userC"
-                     TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
-                     TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                     TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0
-                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                    storm-topology2
-                    1
-                    (mk-ed-map [["wordSpoutX" 0 1]
-                                ["wordSpoutY" 1 2]]))
-        supers (gen-supervisors 2 4)
-        cluster (Cluster. (nimbus/standalone-nimbus) supers {}
-                  {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                   "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-        topologies (Topologies. (to-top-map [topology1 topology2]))
-        scheduler (ResourceAwareScheduler.)]
-    (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                         RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-    (.schedule scheduler topologies cluster)
-    (let [assignment (.getAssignmentById cluster "topology1")
-          assigned-slots (.getSlots assignment)
-          executors (.getExecutors assignment)]
-      (is (= 1 (.size assigned-slots)))
-      (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
-      (is (= 7 (.size executors))))
-    (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
-    (let [assignment (.getAssignmentById cluster "topology2")
-          assigned-slots (.getSlots assignment)
-          executors (.getExecutors assignment)]
-      (is (= 1 (.size assigned-slots)))
-      (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
-      (is (= 2 (.size executors))))
-    (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
-
-(deftest test-topology-set-memory-and-cpu-load
-  (let [builder (TopologyBuilder.)
-        _ (.setSpout builder "wordSpout" (TestWordSpout.) 1)
-        _ (doto
-            (.setBolt builder "wordCountBolt" (TestWordCounter.) 1)
-            (.setMemoryLoad 110.0)
-            (.setCPULoad 20.0)
-            (.shuffleGrouping "wordSpout"))
-        supers (gen-supervisors 2 2)  ;; to test whether two tasks will be assigned to one or two nodes
-        storm-topology (.createTopology builder)
-        topology2 (TopologyDetails. "topology2"
-                    {TOPOLOGY-NAME "topology-name-2"
-                     TOPOLOGY-SUBMITTER-USER "userC"
-                     TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
-                     TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                     TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0
-                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                    storm-topology
-                    2
-                    (mk-ed-map [["wordSpout" 0 1]
-                                ["wordCountBolt" 1 2]]))
-        cluster (Cluster. (nimbus/standalone-nimbus) supers {}
-                  {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                   "org.apache.storm.testing.AlternateRackDNSToSwitchMapping"})
-        topologies (Topologies. (to-top-map [topology2]))
-        scheduler (ResourceAwareScheduler.)]
-    (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                         RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-    (.schedule scheduler topologies cluster)
-    (let [assignment (.getAssignmentById cluster "topology2")
-          assigned-slots (.getSlots assignment)
-          executors (.getExecutors assignment)]
-      ;; 4 slots on 1 machine, all executors assigned
-      (is (= 1 (.size assigned-slots)))
-      (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
-      (is (= 2 (.size executors))))
-    (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
-
-(deftest test-resource-limitation
-  (let [builder (TopologyBuilder.)
-        _ (doto (.setSpout builder "wordSpout" (TestWordSpout.) 2)
-            (.setMemoryLoad 1000.0 200.0)
-            (.setCPULoad 250.0))
-        _ (doto (.setBolt builder "wordCountBolt" (TestWordCounter.) 1)
-            (.shuffleGrouping  "wordSpout")
-            (.setMemoryLoad 500.0 100.0)
-            (.setCPULoad 100.0))
-        supers (gen-supervisors 2 2)  ;; need at least two nodes to hold these executors
-        storm-topology (.createTopology builder)
-        topology1 (TopologyDetails. "topology1"
-                    {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userC"
-                     TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
-                     TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                     TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0
-                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                    storm-topology
-                    2 ;; need two workers, each on one node
-                    (mk-ed-map [["wordSpout" 0 2]
-                                ["wordCountBolt" 2 3]]))
-        cluster (Cluster. (nimbus/standalone-nimbus) supers {}
-                  {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                   "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-        topologies (Topologies. (to-top-map [topology1]))
-        scheduler (ResourceAwareScheduler.)]
-    (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                         RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-    (.schedule scheduler topologies cluster)
-    (let [assignment (.getAssignmentById cluster "topology1")
-          assigned-slots (.getSlots assignment)
-          node-ids (map #(.getNodeId %) assigned-slots)
-          executors (.getExecutors assignment)
-          epsilon 0.000001
-          assigned-ed-mem (sort (map #(.getTotalMemReqTask topology1 %) executors))
-          assigned-ed-cpu (sort (map #(.getTotalCpuReqTask topology1 %) executors))
-          ed->super (into {}
-                            (for [[ed slot] (.getExecutorToSlot assignment)]
-                              {ed (.getSupervisorById cluster (.getNodeId slot))}))
-          super->eds (clojurify-structure (Utils/reverseMap ed->super))
-          mem-avail->used (into []
-                                 (for [[super eds] super->eds]
-                                   [(.getTotalMemory super) (reduce + (map #(.getTotalMemReqTask topology1 %) eds))]))
-          cpu-avail->used (into []
-                                 (for [[super eds] super->eds]
-                                   [(.getTotalCPU super) (reduce + (map #(.getTotalCpuReqTask topology1 %) eds))]))]
-    ;; 4 slots on 1 machine, all executors assigned
-    (is (= 2 (.size assigned-slots)))  ;; executor0 resides one one worker (on one), executor1 and executor2 on another worker (on the other node)
-    (is (= 2 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot))))))
-    (is (= 3 (.size executors)))
-    ;; make sure resource (mem/cpu) assigned equals to resource specified
-    (is (< (Math/abs (- 600.0 (first assigned-ed-mem))) epsilon))
-    (is (< (Math/abs (- 1200.0 (second assigned-ed-mem))) epsilon))
-    (is (< (Math/abs (- 1200.0 (last assigned-ed-mem))) epsilon))
-    (is (< (Math/abs (- 100.0 (first assigned-ed-cpu))) epsilon))
-    (is (< (Math/abs (- 250.0 (second assigned-ed-cpu))) epsilon))
-    (is (< (Math/abs (- 250.0 (last assigned-ed-cpu))) epsilon))
-    (doseq [[avail used] mem-avail->used] ;; for each node, assigned mem smaller than total
-      (is (>= avail used)))
-    (doseq [[avail used] cpu-avail->used] ;; for each node, assigned cpu smaller than total
-      (is (>= avail used))))
-  (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))))
-
-(deftest test-scheduling-resilience
-  (let [supers (gen-supervisors 2 2)
-         builder1 (TopologyBuilder.)
-         _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
-         storm-topology1 (.createTopology builder1)
-         topology1 (TopologyDetails. "topology1"
-                     {TOPOLOGY-NAME "topology-name-1"
-                      TOPOLOGY-SUBMITTER-USER "userC"
-                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
-                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                      TOPOLOGY-PRIORITY 0
-                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                     storm-topology1
-                     3 ;; three workers to hold three executors
-                     (mk-ed-map [["spout1" 0 3]]))
-         builder2 (TopologyBuilder.)
-         _ (.setSpout builder2 "spout2" (TestWordSpout.) 2)
-         storm-topology2 (.createTopology builder2)
-         topology2 (TopologyDetails. "topology2"
-                     {TOPOLOGY-NAME "topology-name-2"
-                      TOPOLOGY-SUBMITTER-USER "userC"
-                      TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 1280.0 ;; large enough thus two eds can not be fully assigned to one node
-                      TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                      TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                      TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                      TOPOLOGY-PRIORITY 0
-                      TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                     storm-topology2
-                     2  ;; two workers, each holds one executor and resides on one node
-                     (mk-ed-map [["spout2" 0 2]]))
-        scheduler (ResourceAwareScheduler.)]
-
-    (testing "When a worker fails, RAS does not alter existing assignments on healthy workers"
-      (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
-                              {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                               "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-            topologies (Topologies. (to-top-map [topology2]))
-            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-            _ (.schedule scheduler topologies cluster)
-            assignment (.getAssignmentById cluster "topology2")
-            failed-worker (first (vec (.getSlots assignment)))  ;; choose a worker to mock as failed
-            ed->slot (.getExecutorToSlot assignment)
-            failed-eds (.get (clojurify-structure (Utils/reverseMap ed->slot)) failed-worker)
-            _ (doseq [ed failed-eds] (.remove ed->slot ed))  ;; remove executor details assigned to the worker
-            copy-old-mapping (HashMap. ed->slot)
-            healthy-eds (.keySet copy-old-mapping)
-            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-            _ (.schedule scheduler topologies cluster)
-            new-assignment (.getAssignmentById cluster "topology2")
-            new-ed->slot (.getExecutorToSlot new-assignment)]
-        ;; for each executor that was scheduled on healthy workers, their slots should remain unchanged after a new scheduling
-        (doseq [ed healthy-eds]
-          (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
-        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))
-
-    (testing "When a supervisor fails, RAS does not alter existing assignments"
-      (let [existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1"
-                                                                        {(ExecutorDetails. 0 0) (WorkerSlot. "id0" 0)    ;; worker 0 on the failed super
-                                                                         (ExecutorDetails. 1 1) (WorkerSlot. "id0" 1)    ;; worker 1 on the failed super
-                                                                         (ExecutorDetails. 2 2) (WorkerSlot. "id1" 1)})} ;; worker 2 on the health super
-            cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments
-                              {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                               "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-            topologies (Topologies. (to-top-map [topology1]))
-            assignment (.getAssignmentById cluster "topology1")
-            ed->slot (.getExecutorToSlot assignment)
-            copy-old-mapping (HashMap. ed->slot)
-            existing-eds (.keySet copy-old-mapping)  ;; all the three eds on three workers
-            new-cluster (Cluster. (nimbus/standalone-nimbus)
-                                  (dissoc supers "id0")        ;; mock the super0 as a failed supervisor
-                                  (.getAssignments cluster)
-                                  {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                                   "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-            _ (.schedule scheduler topologies new-cluster) ;; the actual schedule for this topo will not run since it is fully assigned
-            new-assignment (.getAssignmentById new-cluster "topology1")
-            new-ed->slot (.getExecutorToSlot new-assignment)]
-        (doseq [ed existing-eds]
-          (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
-        (is (= "Fully Scheduled" (.get (.getStatusMap new-cluster) "topology1")))))
-
-    (testing "When a supervisor and a worker on it fails, RAS does not alter existing assignments"
-      (let [existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1"
-                                                                        {(ExecutorDetails. 0 0) (WorkerSlot. "id0" 1)    ;; the worker to orphan
-                                                                         (ExecutorDetails. 1 1) (WorkerSlot. "id0" 2)    ;; the worker to kill
-                                                                         (ExecutorDetails. 2 2) (WorkerSlot. "id1" 1)})} ;; the healthy worker
-            cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments
-                              {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                               "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-            topologies (Topologies. (to-top-map [topology1]))
-            assignment (.getAssignmentById cluster "topology1")
-            ed->slot (.getExecutorToSlot assignment)
-            _ (.remove ed->slot (ExecutorDetails. 1 1))  ;; delete one worker of super0 (failed) from topo1 assignment to enable actual schedule for testing
-            copy-old-mapping (HashMap. ed->slot)
-            existing-eds (.keySet copy-old-mapping)  ;; namely the two eds on the orphaned worker and the healthy worker
-            new-cluster (Cluster. (nimbus/standalone-nimbus)
-                                  (dissoc supers "id0")        ;; mock the super0 as a failed supervisor
-                                  (.getAssignments cluster)
-                                  {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                                   "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-            _ (.schedule scheduler topologies new-cluster)
-            new-assignment (.getAssignmentById new-cluster "topology1")
-            new-ed->slot (.getExecutorToSlot new-assignment)]
-        (doseq [ed existing-eds]
-          (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))
-        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap new-cluster) "topology1")))))
-
-    (testing "Scheduling a new topology does not disturb other assignments unnecessarily"
-      (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
-                              {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                               "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-            topologies (Topologies. (to-top-map [topology1]))
-            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-            _ (.schedule scheduler topologies cluster)
-            assignment (.getAssignmentById cluster "topology1")
-            ed->slot (.getExecutorToSlot assignment)
-            copy-old-mapping (HashMap. ed->slot)
-            new-topologies (Topologies. (to-top-map [topology1 topology2]))  ;; a second topology joins
-            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-            _ (.schedule scheduler new-topologies cluster)
-            new-assignment (.getAssignmentById cluster "topology1")
-            new-ed->slot (.getExecutorToSlot new-assignment)]
-        (doseq [ed (.keySet copy-old-mapping)]
-          (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed))))  ;; the assignment for topo1 should not change
-        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
-        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))))))
-
-;; Automated tests for heterogeneous cluster
-(deftest test-heterogeneous-cluster
-  (let [supers (into {} (for [super [(SupervisorDetails. (str "id" 0) (str "host" 0) (list )
-                                                         (map int (list 1 2 3 4))
-                                                         {Config/SUPERVISOR_MEMORY_CAPACITY_MB 4096.0
-                                                          Config/SUPERVISOR_CPU_CAPACITY 800.0})
-                                     (SupervisorDetails. (str "id" 1) (str "host" 1) (list )
-                                                         (map int (list 1 2 3 4))
-                                                         {Config/SUPERVISOR_MEMORY_CAPACITY_MB 1024.0
-                                                          Config/SUPERVISOR_CPU_CAPACITY 200.0})]]
-                          {(.getId super) super}))
-        builder1 (TopologyBuilder.)  ;; topo1 has one single huge task that can not be handled by the small-super
-        _ (doto (.setSpout builder1 "spout1" (TestWordSpout.) 1)
-            (.setMemoryLoad 2000.0 48.0)
-            (.setCPULoad 300.0))
-        storm-topology1 (.createTopology builder1)
-        topology1 (TopologyDetails. "topology1"
-                    {TOPOLOGY-NAME "topology-name-1"
-                     TOPOLOGY-SUBMITTER-USER "userC"
-                     TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
-                     TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                     TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0
-                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                    storm-topology1
-                    1
-                    (mk-ed-map [["spout1" 0 1]]))
-        builder2 (TopologyBuilder.)  ;; topo2 has 4 large tasks
-        _ (doto (.setSpout builder2 "spout2" (TestWordSpout.) 4)
-            (.setMemoryLoad 500.0 12.0)
-            (.setCPULoad 100.0))
-        storm-topology2 (.createTopology builder2)
-        topology2 (TopologyDetails. "topology2"
-                    {TOPOLOGY-NAME "topology-name-2"
-                     TOPOLOGY-SUBMITTER-USER "userC"
-                     TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
-                     TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                     TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0
-                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                    storm-topology2
-                    2
-                    (mk-ed-map [["spout2" 0 4]]))
-        builder3 (TopologyBuilder.) ;; topo3 has 4 medium tasks, launching topo 1-3 together requires the same mem as the cluster's mem capacity (5G)
-        _ (doto (.setSpout builder3 "spout3" (TestWordSpout.) 4)
-            (.setMemoryLoad 200.0 56.0)
-            (.setCPULoad 20.0))
-        storm-topology3 (.createTopology builder3)
-        topology3 (TopologyDetails. "topology3"
-                    {TOPOLOGY-NAME "topology-name-3"
-                     TOPOLOGY-SUBMITTER-USER "userC"
-                     TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
-                     TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                     TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0
-                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                    storm-topology3
-                    2
-                    (mk-ed-map [["spout3" 0 4]]))
-        builder4 (TopologyBuilder.) ;; topo4 has 12 small tasks, each's mem req does not exactly divide a node's mem capacity
-        _ (doto (.setSpout builder4 "spout4" (TestWordSpout.) 2)
-            (.setMemoryLoad 100.0 0.0)
-            (.setCPULoad 30.0))
-        storm-topology4 (.createTopology builder4)
-        topology4 (TopologyDetails. "topology4"
-                    {TOPOLOGY-NAME "topology-name-4"
-                     TOPOLOGY-SUBMITTER-USER "userC"
-                     TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
-                     TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                     TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0
-                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                    storm-topology4
-                    2
-                    (mk-ed-map [["spout4" 0 12]]))
-        builder5 (TopologyBuilder.) ;; topo5 has 40 small tasks, it should be able to exactly use up both the cpu and mem in teh cluster
-        _ (doto (.setSpout builder5 "spout5" (TestWordSpout.) 40)
-            (.setMemoryLoad 100.0 28.0)
-            (.setCPULoad 25.0))
-        storm-topology5 (.createTopology builder5)
-        topology5 (TopologyDetails. "topology5"
-                    {TOPOLOGY-NAME "topology-name-5"
-                     TOPOLOGY-SUBMITTER-USER "userC"
-                     TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
-                     TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                     TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                     TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0
-                     TOPOLOGY-PRIORITY 0
-                     TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                    storm-topology5
-                    2
-                    (mk-ed-map [["spout5" 0 40]]))
-        epsilon 0.000001
-        topologies (Topologies. (to-top-map [topology1 topology2]))]
-
-    (testing "Launch topo 1-3 together, it should be able to use up either mem or cpu resource due to exact division"
-      (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
-                              {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                               "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-            topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
-            scheduler (ResourceAwareScheduler.)
-            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-            _ (.schedule scheduler topologies cluster)
-            super->mem-usage (get-super->mem-usage cluster topologies)
-            super->cpu-usage (get-super->cpu-usage cluster topologies)]
-        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")))
-        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")))
-        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology3")))
-        (doseq [super (.values supers)]
-          (let [mem-avail (.getTotalMemory super)
-                mem-used (.get super->mem-usage super)
-                cpu-avail (.getTotalCPU super)
-                cpu-used (.get super->cpu-usage super)]
-            (is (or (<= (Math/abs (- mem-avail mem-used)) epsilon)
-                    (<= (Math/abs (- cpu-avail cpu-used)) epsilon)))))))
-
-    (testing "Launch topo 1, 2 and 4, they together request a little more mem than available, so one of the 3 topos will not be scheduled"
-      (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
-                              {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                               "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-            topologies (Topologies. (to-top-map [topology1 topology2 topology3]))
-            scheduler (ResourceAwareScheduler.)
-            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-            _ (.schedule scheduler topologies cluster)
-                scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology1")) 1 0)
-                scheduled-topos (+ scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology2")) 1 0))
-                scheduled-topos (+ scheduled-topos (if (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology4")) 1 0))]
-            (is (= scheduled-topos 2)))) ;; only 2 topos will get (fully) scheduled
-
-    (testing "Launch topo5 only, both mem and cpu should be exactly used up"
-      (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
-                              {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                               "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-            topologies (Topologies. (to-top-map [topology5]))
-            scheduler (ResourceAwareScheduler.)
-            _ (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                                   RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-            _ (.schedule scheduler topologies cluster)
-            super->mem-usage (get-super->mem-usage cluster topologies)
-            super->cpu-usage (get-super->cpu-usage cluster topologies)]
-        (is (= "Running - Fully Scheduled by DefaultResourceAwareStrategy" (.get (.getStatusMap cluster) "topology5")))
-        (doseq [super (.values supers)]
-          (let [mem-avail (.getTotalMemory super)
-                mem-used (.get super->mem-usage super)
-                cpu-avail (.getTotalCPU ^SupervisorDetails super)
-                cpu-used (.get super->cpu-usage super)]
-            (is (and (<= (Math/abs (- mem-avail mem-used)) epsilon)
-                    (<= (Math/abs (- cpu-avail cpu-used)) epsilon)))))))))
-
-(deftest test-topology-worker-max-heap-size
-  (let [supers (gen-supervisors 2 2)]
-    (testing "test if RAS will spread executors across mulitple workers based on the set limit for a worker used by the topology")
-    (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
-                              {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                               "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-          scheduler (ResourceAwareScheduler.)
-          builder1 (TopologyBuilder.)
-          _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
-          storm-topology1 (.createTopology builder1)
-          topology1 (TopologyDetails. "topology1"
-                      {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userA"
-                       TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
-                       TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                       TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                       TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
-                       TOPOLOGY-PRIORITY 0
-                       TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                      storm-topology1
-                      1
-                      (mk-ed-map [["spout1" 0 4]]))
-          topologies (Topologies. (to-top-map [topology1]))]
-      (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                           RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-      (.schedule scheduler topologies cluster)
-      (is (= (.get (.getStatusMap cluster) "topology1") "Running - Fully Scheduled by DefaultResourceAwareStrategy"))
-      (is (= (.getAssignedNumWorkers cluster topology1) 4)))
-    (testing "test when no more workers are available due to topology worker max heap size limit but there is memory is still available")
-    (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
-                              {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                               "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-          scheduler (ResourceAwareScheduler.)
-          builder1 (TopologyBuilder.)
-          _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
-          storm-topology1 (.createTopology builder1)
-          topology1 (TopologyDetails. "topology1"
-                      {TOPOLOGY-NAME "topology-name-1"
-                       TOPOLOGY-SUBMITTER-USER "userC"
-                       TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0
-                       TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                       TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                       TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
-                       TOPOLOGY-PRIORITY 0
-                       TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-                      storm-topology1
-                      1
-                      (mk-ed-map [["spout1" 0 5]]))
-          topologies (Topologies. (to-top-map [topology1]))]
-      (.prepare scheduler {RESOURCE-AWARE-SCHEDULER-EVICTION-STRATEGY DEFAULT_EVICTION_STRATEGY
-                           RESOURCE-AWARE-SCHEDULER-PRIORITY-STRATEGY DEFAULT_PRIORITY_STRATEGY})
-      (.schedule scheduler topologies cluster)
-      ;;spout1 is going to contain 5 executors that needs scheduling. Each of those executors has a memory requirement of 128.0 MB
-      ;;The cluster contains 4 free WorkerSlots. For this topolology each worker is limited to a max heap size of 128.0
-      ;;Thus, one executor not going to be able to get scheduled thus failing the scheduling of this topology and no executors of this topology will be scheduleded
-      (is (= (.size (.getUnassignedExecutors cluster topology1)) 5))
-      (is (= (.get (.getStatusMap cluster) "topology1")  "Not enough resources to schedule - 0/5 executors scheduled")))
-
-    (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {}
-                             {STORM-NETWORK-TOPOGRAPHY-PLUGIN
-                              "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping"})
-          cluster (LocalCluster.)
-          builder1 (TopologyBuilder.)
-          _ (.setSpout builder1 "spout1" (TestWordSpout.) 2)
-          storm-topology1 (.createTopology builder1)
-          conf  {TOPOLOGY-NAME "topology-name-1"
-                 TOPOLOGY-SUBMITTER-USER "userC"
-                 TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 129.0
-                 TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0
-                 TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0
-                 TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0
-                 TOPOLOGY-PRIORITY 0
-                 TOPOLOGY-SCHEDULER-STRATEGY DEFAULT_SCHEDULING_STRATEGY}
-          topology1 (TopologyDetails. "topology1"
-                      conf
-                      storm-topology1
-                      1
-                      (mk-ed-map [["spout1" 0 5]]))
-          topologies (Topologies. (to-top-map [topology1]))]
-      (is (thrown? IllegalArgumentException
-            (StormSubmitter/submitTopologyWithProgressBar "test" conf storm-topology1)))
-
-  )))


[7/8] storm git commit: Minor

Posted by zh...@apache.org.
Minor


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3c79096f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3c79096f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3c79096f

Branch: refs/heads/master
Commit: 3c79096f6295559ee649e88ff3569a6fae0b5723
Parents: 91f8a45
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Mar 25 16:38:53 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Mar 25 16:38:53 2016 -0500

----------------------------------------------------------------------
 .../scheduler/resource/TestResourceAwareScheduler.java      | 9 ++-------
 1 file changed, 2 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3c79096f/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 720f2d6..d81c176 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -432,12 +432,7 @@ public class TestResourceAwareScheduler {
         // pick a worker to mock as failed
         WorkerSlot failedWorker = new ArrayList<WorkerSlot>(assignment.getSlots()).get(0);
         Map<ExecutorDetails, WorkerSlot> executorToSlot = assignment.getExecutorToSlot();
-        List<ExecutorDetails> failedExecutors = new ArrayList<>();
-        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : executorToSlot.entrySet()) {
-            if (entry.getValue().equals(failedWorker)) {
-                failedExecutors.add(entry.getKey());
-            }
-        }
+        Collection<ExecutorDetails> failedExecutors = assignment.getSlotToExecutors().get(failedWorker);
         for (ExecutorDetails executor : failedExecutors) {
             executorToSlot.remove(executor); // remove executor details assigned to the failed worker
         }
@@ -527,7 +522,7 @@ public class TestResourceAwareScheduler {
         topologies = new Topologies(topoMap);
         rs.schedule(topologies, cluster1);
 
-        newAssignment = (SchedulerAssignmentImpl)cluster1.getAssignmentById(topology1.getId());
+        newAssignment = cluster1.getAssignmentById(topology1.getId());
         newExecutorToSlot = newAssignment.getExecutorToSlot();
 
         for (ExecutorDetails executor : copyOfOldMapping.keySet()) {


[4/8] storm git commit: Merge two functions

Posted by zh...@apache.org.
Merge two functions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/58f1161c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/58f1161c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/58f1161c

Branch: refs/heads/master
Commit: 58f1161cb077f90fb64e8a68f9da9c9aedf4f7dd
Parents: 6340601
Author: zhuol <zh...@yahoo-inc.com>
Authored: Thu Mar 24 17:31:13 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Thu Mar 24 17:31:13 2016 -0500

----------------------------------------------------------------------
 .../resource/TestResourceAwareScheduler.java    | 10 +++--
 .../TestUtilsForResourceAwareScheduler.java     | 44 ++++----------------
 2 files changed, 13 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/58f1161c/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 28fd491..9cfdc6e 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -616,8 +616,9 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
         Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology3.getId()));
 
-        Map<SupervisorDetails, Double> superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
-        Map<SupervisorDetails, Double> superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
+        Map<SupervisorDetails, Double> superToCpu = new HashMap<>();
+        Map<SupervisorDetails, Double> superToMem = new HashMap<>();
+        TestUtilsForResourceAwareScheduler.getSupervisorToResourceUsage(cluster, topologies, superToCpu, superToMem);
 
         final Double EPSILON = 0.0001;
         for (SupervisorDetails supervisor : supMap.values()) {
@@ -658,8 +659,9 @@ public class TestResourceAwareScheduler {
         topologies = new Topologies(topoMap);
         rs.prepare(config1);
         rs.schedule(topologies, cluster);
-        superToCpu = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
-        superToMem = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
+        superToCpu = new HashMap<>();
+        superToMem = new HashMap<>();
+        TestUtilsForResourceAwareScheduler.getSupervisorToResourceUsage(cluster, topologies, superToCpu, superToMem);
         for (SupervisorDetails supervisor : supMap.values()) {
             Double cpuAvailable = supervisor.getTotalCPU();
             Double memAvailable = supervisor.getTotalMemory();

http://git-wip-us.apache.org/repos/asf/storm/blob/58f1161c/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index 7cd21ce..8b7f9c8 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -289,46 +289,14 @@ public class TestUtilsForResourceAwareScheduler {
         return ret;
     }
 
-    public static Map<SupervisorDetails, Double> getSupervisorToMemoryUsage(Cluster cluster, Topologies topologies) {
-        Map<SupervisorDetails, Double> superToMem = new HashMap<>();
-        Collection<SchedulerAssignment> assignments = cluster.getAssignments().values();
-        Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
-        for (SupervisorDetails supervisor : supervisors) {
-            superToMem.put(supervisor, 0.0);
-        }
-
-        for (SchedulerAssignment assignment : assignments) {
-            Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>();
-            Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>();
-            TopologyDetails topology = topologies.getById(assignment.getTopologyId());
-            for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment.getExecutorToSlot().entrySet()) {
-                executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId()));
-            }
-            for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) {
-                List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue());
-                if (executorsOnSupervisor == null) {
-                    executorsOnSupervisor = new ArrayList<>();
-                    supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor);
-                }
-                executorsOnSupervisor.add(entry.getKey());
-            }
-            for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
-                Double supervisorUsedMemory = 0.0;
-                for (ExecutorDetails executor: entry.getValue()) {
-                    supervisorUsedMemory += topology.getTotalMemReqTask(executor);
-                }
-                superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory);
-            }
-        }
-        return superToMem;
-    }
-
-    public static Map<SupervisorDetails, Double> getSupervisorToCpuUsage(Cluster cluster, Topologies topologies) {
-        Map<SupervisorDetails, Double> superToCpu = new HashMap<>();
+    public static void getSupervisorToResourceUsage(Cluster cluster, Topologies topologies,
+                                                    Map<SupervisorDetails, Double> superToCpu,
+                                                    Map<SupervisorDetails, Double> superToMem) {
         Collection<SchedulerAssignment> assignments = cluster.getAssignments().values();
         Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
         for (SupervisorDetails supervisor : supervisors) {
             superToCpu.put(supervisor, 0.0);
+            superToMem.put(supervisor, 0.0);
         }
 
         for (SchedulerAssignment assignment : assignments) {
@@ -348,12 +316,14 @@ public class TestUtilsForResourceAwareScheduler {
             }
             for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
                 Double supervisorUsedCpu = 0.0;
+                Double supervisorUsedMemory = 0.0;
                 for (ExecutorDetails executor: entry.getValue()) {
                     supervisorUsedCpu += topology.getTotalCpuReqTask(executor);
+                    supervisorUsedMemory += topology.getTotalMemReqTask(executor);
                 }
                 superToCpu.put(entry.getKey(), superToCpu.get(entry.getKey()) + supervisorUsedCpu);
+                superToMem.put(entry.getKey(), superToMem.get(entry.getKey()) + supervisorUsedMemory);
             }
         }
-        return superToCpu;
     }
 }


[6/8] storm git commit: Modify comment

Posted by zh...@apache.org.
Modify comment


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/91f8a456
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/91f8a456
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/91f8a456

Branch: refs/heads/master
Commit: 91f8a456d99f92aabb50898f6935a7b44015bf40
Parents: 13e8b11
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Mar 25 15:37:41 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Mar 25 15:37:41 2016 -0500

----------------------------------------------------------------------
 .../storm/scheduler/resource/TestResourceAwareScheduler.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/91f8a456/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 5ae1432..720f2d6 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -308,7 +308,7 @@ public class TestResourceAwareScheduler {
         resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
         Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
 
-        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
+        TopologyBuilder builder1 = new TopologyBuilder();
         builder1.setSpout("wordSpout", new TestWordSpout(), 2).setCPULoad(250.0).setMemoryLoad(1000.0, 200.0);
         builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(100.0).setMemoryLoad(500.0, 100.0);
         StormTopology stormTopology1 = builder1.createTopology();


[2/8] storm git commit: Minor

Posted by zh...@apache.org.
Minor


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7a302e3b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7a302e3b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7a302e3b

Branch: refs/heads/master
Commit: 7a302e3bc5b6652642ed5fb9ff6f4fed8607680f
Parents: c1b93de
Author: zhuol <zh...@yahoo-inc.com>
Authored: Fri Mar 18 16:11:50 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Fri Mar 18 16:11:50 2016 -0500

----------------------------------------------------------------------
 storm-core/src/jvm/org/apache/storm/Config.java |  2 +-
 .../jvm/org/apache/storm/utils/ConfigUtils.java | 20 ++++++++++----------
 .../resource/TestResourceAwareScheduler.java    |  1 -
 3 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7a302e3b/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 6ea8b0f..05030e8 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -232,7 +232,7 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * Whether we want to display all the resource capacity and scheduled usage on the UI page.
-     * We suggest to have this variable set if you are using any kind of resource-related scheduler.
+     * You MUST have this variable set if you are using any kind of resource-related scheduler.
      *
      * If this is not set, we will not display resource capacity and usage on the UI.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/7a302e3b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
index c6543d4..ed3d305 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -129,7 +129,7 @@ public class ConfigUtils {
 
     // public static mkStatsSampler // depends on Utils.evenSampler() TODO, this is sth we need to do after util
 
-    // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
     public static Map readStormConfig() {
         return _instance.readStormConfigImpl();
     }
@@ -235,7 +235,7 @@ public class ConfigUtils {
         return (masterLocalDir(conf) + FILE_SEPARATOR + "inimbus");
     }
 
-    // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
     public static String supervisorLocalDir(Map conf) throws IOException {
         return _instance.supervisorLocalDirImpl(conf);
     }
@@ -250,7 +250,7 @@ public class ConfigUtils {
         return (supervisorLocalDir(conf) + FILE_SEPARATOR + "isupervisor");
     }
 
-    // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
     public static String supervisorStormDistRoot(Map conf) throws IOException {
         return _instance.supervisorStormDistRootImpl(conf);
     }
@@ -259,7 +259,7 @@ public class ConfigUtils {
         return stormDistPath(supervisorLocalDir(conf));
     }
 
-    // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
     public static String supervisorStormDistRoot(Map conf, String stormId) throws IOException {
         return _instance.supervisorStormDistRootImpl(conf, stormId);
     }
@@ -299,7 +299,7 @@ public class ConfigUtils {
         return (concatIfNotNull(stormRoot) + FILE_SEPARATOR + RESOURCES_SUBDIR);
     }
 
-    // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
     public static LocalState supervisorState(Map conf) throws IOException {
         return _instance.supervisorStateImpl(conf);
     }
@@ -308,7 +308,7 @@ public class ConfigUtils {
         return new LocalState((supervisorLocalDir(conf) + FILE_SEPARATOR + "localstate"));
     }
 
-    // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
     public static LocalState nimbusTopoHistoryState(Map conf) throws IOException {
         return _instance.nimbusTopoHistoryStateImpl(conf);
     }
@@ -317,7 +317,7 @@ public class ConfigUtils {
         return new LocalState((masterLocalDir(conf) + FILE_SEPARATOR + "history"));
     }
 
-    // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
     public static Map readSupervisorStormConf(Map conf, String stormId) throws IOException {
         return _instance.readSupervisorStormConfImpl(conf, stormId);
     }
@@ -380,7 +380,7 @@ public class ConfigUtils {
         return ret;
     }
 
-    // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
     public static void setWorkerUserWSE(Map conf, String workerId, String user) throws IOException {
         _instance.setWorkerUserWSEImpl(conf, workerId, user);
     }
@@ -401,7 +401,7 @@ public class ConfigUtils {
         new File(workerUserFile(conf, workerId)).delete();
     }
 
-    // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
     public static String workerArtifactsRoot(Map conf) {
         return _instance.workerArtifactsRootImpl(conf);
     }
@@ -447,7 +447,7 @@ public class ConfigUtils {
         return new File((logRoot + FILE_SEPARATOR + id + FILE_SEPARATOR + port));
     }
 
-    // we use this "wired" wrapper pattern temporarily for mocking in clojure test
+    // we use this "weird" wrapper pattern temporarily for mocking in clojure test
     public static String workerRoot(Map conf) {
         return _instance.workerRootImpl(conf);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7a302e3b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index e0336ea..28fd491 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -63,7 +63,6 @@ public class TestResourceAwareScheduler {
 
     private static final Config defaultTopologyConf = new Config();
 
-
     @BeforeClass
     public static void initConf() {
         defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");


[8/8] storm git commit: Merge branch '1300' of https://github.com/zhuoliu/storm into STORM-1300

Posted by zh...@apache.org.
Merge branch '1300' of https://github.com/zhuoliu/storm into STORM-1300


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/da7969ea
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/da7969ea
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/da7969ea

Branch: refs/heads/master
Commit: da7969ea946ca9baa633a59454cc9ab0de4542db
Parents: 89a349e 3c79096
Author: zhuol <zh...@yahoo-inc.com>
Authored: Mon Mar 28 14:06:31 2016 -0500
Committer: zhuol <zh...@yahoo-inc.com>
Committed: Mon Mar 28 14:06:31 2016 -0500

----------------------------------------------------------------------
 storm-core/src/jvm/org/apache/storm/Config.java |   2 +-
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  20 +-
 .../scheduler/resource_aware_scheduler_test.clj | 738 -------------------
 .../resource/TestResourceAwareScheduler.java    | 680 ++++++++++++++++-
 .../TestUtilsForResourceAwareScheduler.java     |  43 +-
 5 files changed, 732 insertions(+), 751 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/da7969ea/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/da7969ea/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------