You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by zhuoliu <gi...@git.apache.org> on 2016/03/18 22:08:01 UTC

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

GitHub user zhuoliu opened a pull request:

    https://github.com/apache/storm/pull/1232

    [STORM-1300] backtype.storm.scheduler.resource-aware-scheduler-test

    Port backtype.storm.scheduler.resource-aware-scheduler-test to java

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zhuoliu/storm 1300

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1232.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1232
    
----
commit c1b93de1650d113df0e1d0493780d9915bf3dacc
Author: zhuol <zh...@yahoo-inc.com>
Date:   2016-03-18T21:06:00Z

    [STORM-1300] port backtype.storm.scheduler.resource-aware-scheduler-test to java.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on the pull request:

    https://github.com/apache/storm/pull/1232#issuecomment-202427400
  
    @jerrypeng , I have addressed your comments. Could you check back? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1232#discussion_r57485480
  
    --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java ---
    @@ -54,6 +61,681 @@
     
         private static int currentTime = 1450418597;
     
    +    private static final Config defaultTopologyConf = new Config();
    +
    +    @BeforeClass
    +    public static void initConf() {
    +        defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
    +        defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +
    +        defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
    +    }
    +
    +    @Test
    +    public void testRASNodeSlotAssign() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
    +        Topologies topologies = new Topologies(new HashMap<String, TopologyDetails>());
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), new HashMap());
    +        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
    +        Assert.assertEquals(5, nodes.size());
    +        RAS_Node node = nodes.get("sup-0");
    +
    +        Assert.assertEquals("sup-0", node.getId());
    +        Assert.assertTrue(node.isAlive());
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors11 = new ArrayList<>();
    +        executors11.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, executors11);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(3, node.totalSlotsFree());
    +        Assert.assertEquals(1, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors12 = new ArrayList<>();
    +        executors12.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, executors12);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(2, node.totalSlotsFree());
    +        Assert.assertEquals(2, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors21 = new ArrayList<>();
    +        executors21.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, executors21);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(1, node.totalSlotsFree());
    +        Assert.assertEquals(3, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors22 = new ArrayList<>();
    +        executors22.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, executors22);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(0, node.totalSlotsFree());
    +        Assert.assertEquals(4, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        node.freeAllSlots();
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +    }
    +
    +    @Test
    +    public void sanityTestOfScheduling() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, resourceMap);
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 0, 0);
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment = cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots = assignment.getSlots();
    +        Set<String> nodesIDs = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots) {
    +            nodesIDs.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors = assignment.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots.size());
    +        Assert.assertEquals(1, nodesIDs.size());
    +        Assert.assertEquals(2, executors.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologyWithMultipleSpouts() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
    +        builder1.setSpout("wordSpout1", new TestWordSpout(), 1);
    +        builder1.setSpout("wordSpout2", new TestWordSpout(), 1);
    +        builder1.setBolt("wordCountBolt1", new TestWordCounter(), 1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
    +        builder1.setBolt("wordCountBolt2", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt3", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt4", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt2");
    +        builder1.setBolt("wordCountBolt5", new TestWordCounter(), 1).shuffleGrouping("wordSpout2");
    +        StormTopology stormTopology1 = builder1.createTopology();
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0);
    +
    +        TopologyBuilder builder2 = new TopologyBuilder(); // a topology with two unconnected partitions
    +        builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
    +        builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
    +        StormTopology stormTopology2 = builder2.createTopology();
    +        Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 1, 0);
    +        TopologyDetails topology2 = new TopologyDetails("topology2", config, stormTopology2, 0, executorMap2, 0);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        topoMap.put(topology2.getId(), topology2);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
    +        Set<String> nodesIDs1 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots1) {
    +            nodesIDs1.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots1.size());
    +        Assert.assertEquals(1, nodesIDs1.size());
    +        Assert.assertEquals(7, executors1.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +
    +        SchedulerAssignment assignment2 = cluster.getAssignmentById(topology2.getId());
    +        Set<WorkerSlot> assignedSlots2 = assignment2.getSlots();
    +        Set<String> nodesIDs2 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots2) {
    +            nodesIDs2.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors2 = assignment2.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots2.size());
    +        Assert.assertEquals(1, nodesIDs2.size());
    +        Assert.assertEquals(2, executors2.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologySetCpuAndMemLoad() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
    +        builder1.setSpout("wordSpout", new TestWordSpout(), 1).setCPULoad(20.0).setMemoryLoad(200.0);
    --- End diff --
    
    In the original clojure code, we did not set the resource requirement for the spout.  Is there a reason why we are setting it here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1232


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1232#discussion_r57486697
  
    --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java ---
    @@ -54,6 +61,681 @@
     
         private static int currentTime = 1450418597;
     
    +    private static final Config defaultTopologyConf = new Config();
    +
    +    @BeforeClass
    +    public static void initConf() {
    +        defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
    +        defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +
    +        defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
    +    }
    +
    +    @Test
    +    public void testRASNodeSlotAssign() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
    +        Topologies topologies = new Topologies(new HashMap<String, TopologyDetails>());
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), new HashMap());
    +        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
    +        Assert.assertEquals(5, nodes.size());
    +        RAS_Node node = nodes.get("sup-0");
    +
    +        Assert.assertEquals("sup-0", node.getId());
    +        Assert.assertTrue(node.isAlive());
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors11 = new ArrayList<>();
    +        executors11.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, executors11);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(3, node.totalSlotsFree());
    +        Assert.assertEquals(1, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors12 = new ArrayList<>();
    +        executors12.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, executors12);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(2, node.totalSlotsFree());
    +        Assert.assertEquals(2, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors21 = new ArrayList<>();
    +        executors21.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, executors21);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(1, node.totalSlotsFree());
    +        Assert.assertEquals(3, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors22 = new ArrayList<>();
    +        executors22.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, executors22);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(0, node.totalSlotsFree());
    +        Assert.assertEquals(4, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        node.freeAllSlots();
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +    }
    +
    +    @Test
    +    public void sanityTestOfScheduling() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, resourceMap);
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 0, 0);
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment = cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots = assignment.getSlots();
    +        Set<String> nodesIDs = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots) {
    +            nodesIDs.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors = assignment.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots.size());
    +        Assert.assertEquals(1, nodesIDs.size());
    +        Assert.assertEquals(2, executors.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologyWithMultipleSpouts() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
    +        builder1.setSpout("wordSpout1", new TestWordSpout(), 1);
    +        builder1.setSpout("wordSpout2", new TestWordSpout(), 1);
    +        builder1.setBolt("wordCountBolt1", new TestWordCounter(), 1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
    +        builder1.setBolt("wordCountBolt2", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt3", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt4", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt2");
    +        builder1.setBolt("wordCountBolt5", new TestWordCounter(), 1).shuffleGrouping("wordSpout2");
    +        StormTopology stormTopology1 = builder1.createTopology();
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0);
    +
    +        TopologyBuilder builder2 = new TopologyBuilder(); // a topology with two unconnected partitions
    +        builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
    +        builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
    +        StormTopology stormTopology2 = builder2.createTopology();
    +        Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 1, 0);
    +        TopologyDetails topology2 = new TopologyDetails("topology2", config, stormTopology2, 0, executorMap2, 0);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        topoMap.put(topology2.getId(), topology2);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
    +        Set<String> nodesIDs1 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots1) {
    +            nodesIDs1.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots1.size());
    +        Assert.assertEquals(1, nodesIDs1.size());
    +        Assert.assertEquals(7, executors1.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +
    +        SchedulerAssignment assignment2 = cluster.getAssignmentById(topology2.getId());
    +        Set<WorkerSlot> assignedSlots2 = assignment2.getSlots();
    +        Set<String> nodesIDs2 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots2) {
    +            nodesIDs2.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors2 = assignment2.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots2.size());
    +        Assert.assertEquals(1, nodesIDs2.size());
    +        Assert.assertEquals(2, executors2.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologySetCpuAndMemLoad() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
    +        builder1.setSpout("wordSpout", new TestWordSpout(), 1).setCPULoad(20.0).setMemoryLoad(200.0);
    +        builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(20.0).setMemoryLoad(200.0);
    +        StormTopology stormTopology1 = builder1.createTopology();
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
    +        double assignedMemory = 0.0;
    +        double assignedCpu = 0.0;
    +        Set<String> nodesIDs1 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots1) {
    +            nodesIDs1.add(slot.getNodeId());
    +            assignedMemory += slot.getAllocatedMemOnHeap() + slot.getAllocatedMemOffHeap();
    +            assignedCpu += slot.getAllocatedCpu();
    +
    +        }
    +        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots1.size());
    +        Assert.assertEquals(1, nodesIDs1.size());
    +        Assert.assertEquals(2, executors1.size());
    +        Assert.assertEquals(400.0, assignedMemory, 0.001);
    +        Assert.assertEquals(40.0, assignedCpu, 0.001);
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +    }
    +
    +    @Test
    +    public void testResourceLimitation() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
    --- End diff --
    
    comment seems to be wrong


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1232#discussion_r57397001
  
    --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java ---
    @@ -109,7 +112,7 @@
         public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology, int spoutParallelism, int boltParallelism) {
             Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String>();
             int startTask = 0;
    -        int endTask = 1;
    +        int endTask = 0;
    --- End diff --
    
    This corrects the executor number in tests to be [0,0] [1,1]; which used to be [0, 1] [1,2], not actually what we have in Storm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the pull request:

    https://github.com/apache/storm/pull/1232#issuecomment-202514705
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1232#discussion_r57388801
  
    --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java ---
    @@ -109,7 +112,7 @@
         public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology, int spoutParallelism, int boltParallelism) {
             Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String>();
             int startTask = 0;
    -        int endTask = 1;
    +        int endTask = 0;
    --- End diff --
    
    What is this change about?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on the pull request:

    https://github.com/apache/storm/pull/1232#issuecomment-201057484
  
    Hi @knusbaum , comments addressed. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1232#discussion_r57485387
  
    --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java ---
    @@ -54,6 +61,681 @@
     
         private static int currentTime = 1450418597;
     
    +    private static final Config defaultTopologyConf = new Config();
    +
    +    @BeforeClass
    +    public static void initConf() {
    +        defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
    +        defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +
    +        defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
    +    }
    +
    +    @Test
    +    public void testRASNodeSlotAssign() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
    +        Topologies topologies = new Topologies(new HashMap<String, TopologyDetails>());
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), new HashMap());
    +        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
    +        Assert.assertEquals(5, nodes.size());
    +        RAS_Node node = nodes.get("sup-0");
    +
    +        Assert.assertEquals("sup-0", node.getId());
    +        Assert.assertTrue(node.isAlive());
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors11 = new ArrayList<>();
    +        executors11.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, executors11);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(3, node.totalSlotsFree());
    +        Assert.assertEquals(1, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors12 = new ArrayList<>();
    +        executors12.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, executors12);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(2, node.totalSlotsFree());
    +        Assert.assertEquals(2, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors21 = new ArrayList<>();
    +        executors21.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, executors21);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(1, node.totalSlotsFree());
    +        Assert.assertEquals(3, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors22 = new ArrayList<>();
    +        executors22.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, executors22);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(0, node.totalSlotsFree());
    +        Assert.assertEquals(4, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        node.freeAllSlots();
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +    }
    +
    +    @Test
    +    public void sanityTestOfScheduling() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, resourceMap);
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 0, 0);
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment = cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots = assignment.getSlots();
    +        Set<String> nodesIDs = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots) {
    +            nodesIDs.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors = assignment.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots.size());
    +        Assert.assertEquals(1, nodesIDs.size());
    +        Assert.assertEquals(2, executors.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologyWithMultipleSpouts() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
    +        builder1.setSpout("wordSpout1", new TestWordSpout(), 1);
    +        builder1.setSpout("wordSpout2", new TestWordSpout(), 1);
    +        builder1.setBolt("wordCountBolt1", new TestWordCounter(), 1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
    +        builder1.setBolt("wordCountBolt2", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt3", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt4", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt2");
    +        builder1.setBolt("wordCountBolt5", new TestWordCounter(), 1).shuffleGrouping("wordSpout2");
    +        StormTopology stormTopology1 = builder1.createTopology();
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0);
    +
    +        TopologyBuilder builder2 = new TopologyBuilder(); // a topology with two unconnected partitions
    +        builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
    +        builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
    +        StormTopology stormTopology2 = builder2.createTopology();
    +        Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 1, 0);
    +        TopologyDetails topology2 = new TopologyDetails("topology2", config, stormTopology2, 0, executorMap2, 0);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        topoMap.put(topology2.getId(), topology2);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
    +        Set<String> nodesIDs1 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots1) {
    +            nodesIDs1.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots1.size());
    +        Assert.assertEquals(1, nodesIDs1.size());
    +        Assert.assertEquals(7, executors1.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +
    +        SchedulerAssignment assignment2 = cluster.getAssignmentById(topology2.getId());
    +        Set<WorkerSlot> assignedSlots2 = assignment2.getSlots();
    +        Set<String> nodesIDs2 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots2) {
    +            nodesIDs2.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors2 = assignment2.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots2.size());
    +        Assert.assertEquals(1, nodesIDs2.size());
    +        Assert.assertEquals(2, executors2.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologySetCpuAndMemLoad() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
    +        builder1.setSpout("wordSpout", new TestWordSpout(), 1).setCPULoad(20.0).setMemoryLoad(200.0);
    +        builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(20.0).setMemoryLoad(200.0);
    --- End diff --
    
    In the original clojure code with set the the memory load to be 110.0.  Is there a reason why we changed this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1232#discussion_r57388639
  
    --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java ---
    @@ -285,4 +288,72 @@ public static TopologyDetails findTopologyInSetFromName(String topoName, Set<Top
             }
             return ret;
         }
    +
    +    public static Map<SupervisorDetails, Double> getSupervisorToMemoryUsage(Cluster cluster, Topologies topologies) {
    --- End diff --
    
    This method and getSupervisorToCpuUsage are almost exactly identical. Can you combine them to reduce duplicate code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1232#discussion_r57586495
  
    --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java ---
    @@ -109,7 +112,7 @@
         public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology, int spoutParallelism, int boltParallelism) {
             Map<ExecutorDetails, String> retMap = new HashMap<ExecutorDetails, String>();
             int startTask = 0;
    -        int endTask = 1;
    +        int endTask = 0;
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on the pull request:

    https://github.com/apache/storm/pull/1232#issuecomment-200985657
  
    Hi @jerrypeng @d2r @revans2 , would you mind having a look at this one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by zhuoliu <gi...@git.apache.org>.
Github user zhuoliu commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1232#discussion_r57487640
  
    --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java ---
    @@ -54,6 +61,681 @@
     
         private static int currentTime = 1450418597;
     
    +    private static final Config defaultTopologyConf = new Config();
    +
    +    @BeforeClass
    +    public static void initConf() {
    +        defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
    +        defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +
    +        defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
    +    }
    +
    +    @Test
    +    public void testRASNodeSlotAssign() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
    +        Topologies topologies = new Topologies(new HashMap<String, TopologyDetails>());
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), new HashMap());
    +        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
    +        Assert.assertEquals(5, nodes.size());
    +        RAS_Node node = nodes.get("sup-0");
    +
    +        Assert.assertEquals("sup-0", node.getId());
    +        Assert.assertTrue(node.isAlive());
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors11 = new ArrayList<>();
    +        executors11.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, executors11);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(3, node.totalSlotsFree());
    +        Assert.assertEquals(1, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors12 = new ArrayList<>();
    +        executors12.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, executors12);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(2, node.totalSlotsFree());
    +        Assert.assertEquals(2, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors21 = new ArrayList<>();
    +        executors21.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, executors21);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(1, node.totalSlotsFree());
    +        Assert.assertEquals(3, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors22 = new ArrayList<>();
    +        executors22.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, executors22);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(0, node.totalSlotsFree());
    +        Assert.assertEquals(4, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        node.freeAllSlots();
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +    }
    +
    +    @Test
    +    public void sanityTestOfScheduling() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, resourceMap);
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 0, 0);
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment = cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots = assignment.getSlots();
    +        Set<String> nodesIDs = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots) {
    +            nodesIDs.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors = assignment.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots.size());
    +        Assert.assertEquals(1, nodesIDs.size());
    +        Assert.assertEquals(2, executors.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologyWithMultipleSpouts() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
    +        builder1.setSpout("wordSpout1", new TestWordSpout(), 1);
    +        builder1.setSpout("wordSpout2", new TestWordSpout(), 1);
    +        builder1.setBolt("wordCountBolt1", new TestWordCounter(), 1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
    +        builder1.setBolt("wordCountBolt2", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt3", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt4", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt2");
    +        builder1.setBolt("wordCountBolt5", new TestWordCounter(), 1).shuffleGrouping("wordSpout2");
    +        StormTopology stormTopology1 = builder1.createTopology();
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0);
    +
    +        TopologyBuilder builder2 = new TopologyBuilder(); // a topology with two unconnected partitions
    +        builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
    +        builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
    +        StormTopology stormTopology2 = builder2.createTopology();
    +        Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 1, 0);
    +        TopologyDetails topology2 = new TopologyDetails("topology2", config, stormTopology2, 0, executorMap2, 0);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        topoMap.put(topology2.getId(), topology2);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
    +        Set<String> nodesIDs1 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots1) {
    +            nodesIDs1.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots1.size());
    +        Assert.assertEquals(1, nodesIDs1.size());
    +        Assert.assertEquals(7, executors1.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +
    +        SchedulerAssignment assignment2 = cluster.getAssignmentById(topology2.getId());
    +        Set<WorkerSlot> assignedSlots2 = assignment2.getSlots();
    +        Set<String> nodesIDs2 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots2) {
    +            nodesIDs2.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors2 = assignment2.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots2.size());
    +        Assert.assertEquals(1, nodesIDs2.size());
    +        Assert.assertEquals(2, executors2.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologySetCpuAndMemLoad() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
    +        builder1.setSpout("wordSpout", new TestWordSpout(), 1).setCPULoad(20.0).setMemoryLoad(200.0);
    --- End diff --
    
    Hi @jerrypeng , the original clj test only tests slot and executor number where the cpu/mem load number does not actually matter. This test set a specific number for cpu and mem so that we can verify the assigned memory and cpu in the end (either 110 or 200 will work).
    +        Assert.assertEquals(400.0, assignedMemory, 0.001);
    +        Assert.assertEquals(40.0, assignedCpu, 0.001);


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1232#discussion_r57491392
  
    --- Diff: storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java ---
    @@ -54,6 +61,681 @@
     
         private static int currentTime = 1450418597;
     
    +    private static final Config defaultTopologyConf = new Config();
    +
    +    @BeforeClass
    +    public static void initConf() {
    +        defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
    +        defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    +        defaultTopologyConf.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    +
    +        defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 8192.0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_PRIORITY, 0);
    +        defaultTopologyConf.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
    +    }
    +
    +    @Test
    +    public void testRASNodeSlotAssign() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, resourceMap);
    +        Topologies topologies = new Topologies(new HashMap<String, TopologyDetails>());
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), new HashMap());
    +        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
    +        Assert.assertEquals(5, nodes.size());
    +        RAS_Node node = nodes.get("sup-0");
    +
    +        Assert.assertEquals("sup-0", node.getId());
    +        Assert.assertTrue(node.isAlive());
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", new HashMap(), 1, 0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors11 = new ArrayList<>();
    +        executors11.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, executors11);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(3, node.totalSlotsFree());
    +        Assert.assertEquals(1, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors12 = new ArrayList<>();
    +        executors12.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology1, executors12);
    +        Assert.assertEquals(1, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(2, node.totalSlotsFree());
    +        Assert.assertEquals(2, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        TopologyDetails topology2 = TestUtilsForResourceAwareScheduler.getTopology("topology2", new HashMap(), 1, 0, 2, 0, 0, 0);
    +
    +        List<ExecutorDetails> executors21 = new ArrayList<>();
    +        executors21.add(new ExecutorDetails(1, 1));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, executors21);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(1, node.totalSlotsFree());
    +        Assert.assertEquals(3, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        List<ExecutorDetails> executors22 = new ArrayList<>();
    +        executors22.add(new ExecutorDetails(2, 2));
    +        node.assign(node.getFreeSlots().iterator().next(), topology2, executors22);
    +        Assert.assertEquals(2, node.getRunningTopologies().size());
    +        Assert.assertFalse(node.isTotallyFree());
    +        Assert.assertEquals(0, node.totalSlotsFree());
    +        Assert.assertEquals(4, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +
    +        node.freeAllSlots();
    +        Assert.assertEquals(0, node.getRunningTopologies().size());
    +        Assert.assertTrue(node.isTotallyFree());
    +        Assert.assertEquals(4, node.totalSlotsFree());
    +        Assert.assertEquals(0, node.totalSlotsUsed());
    +        Assert.assertEquals(4, node.totalSlots());
    +    }
    +
    +    @Test
    +    public void sanityTestOfScheduling() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, resourceMap);
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        TopologyDetails topology1 = TestUtilsForResourceAwareScheduler.getTopology("topology1", config, 1, 1, 1, 1, 0, 0);
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment = cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots = assignment.getSlots();
    +        Set<String> nodesIDs = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots) {
    +            nodesIDs.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors = assignment.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots.size());
    +        Assert.assertEquals(1, nodesIDs.size());
    +        Assert.assertEquals(2, executors.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologyWithMultipleSpouts() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
    +        builder1.setSpout("wordSpout1", new TestWordSpout(), 1);
    +        builder1.setSpout("wordSpout2", new TestWordSpout(), 1);
    +        builder1.setBolt("wordCountBolt1", new TestWordCounter(), 1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
    +        builder1.setBolt("wordCountBolt2", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt3", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
    +        builder1.setBolt("wordCountBolt4", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt2");
    +        builder1.setBolt("wordCountBolt5", new TestWordCounter(), 1).shuffleGrouping("wordSpout2");
    +        StormTopology stormTopology1 = builder1.createTopology();
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0);
    +
    +        TopologyBuilder builder2 = new TopologyBuilder(); // a topology with two unconnected partitions
    +        builder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
    +        builder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
    +        StormTopology stormTopology2 = builder2.createTopology();
    +        Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology2, 1, 0);
    +        TopologyDetails topology2 = new TopologyDetails("topology2", config, stormTopology2, 0, executorMap2, 0);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        topoMap.put(topology2.getId(), topology2);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
    +        Set<String> nodesIDs1 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots1) {
    +            nodesIDs1.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots1.size());
    +        Assert.assertEquals(1, nodesIDs1.size());
    +        Assert.assertEquals(7, executors1.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +
    +        SchedulerAssignment assignment2 = cluster.getAssignmentById(topology2.getId());
    +        Set<WorkerSlot> assignedSlots2 = assignment2.getSlots();
    +        Set<String> nodesIDs2 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots2) {
    +            nodesIDs2.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors2 = assignment2.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots2.size());
    +        Assert.assertEquals(1, nodesIDs2.size());
    +        Assert.assertEquals(2, executors2.size());
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology2.getId()));
    +    }
    +
    +    @Test
    +    public void testTopologySetCpuAndMemLoad() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
    +        builder1.setSpout("wordSpout", new TestWordSpout(), 1).setCPULoad(20.0).setMemoryLoad(200.0);
    +        builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(20.0).setMemoryLoad(200.0);
    +        StormTopology stormTopology1 = builder1.createTopology();
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 1, 1);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
    +        double assignedMemory = 0.0;
    +        double assignedCpu = 0.0;
    +        Set<String> nodesIDs1 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots1) {
    +            nodesIDs1.add(slot.getNodeId());
    +            assignedMemory += slot.getAllocatedMemOnHeap() + slot.getAllocatedMemOffHeap();
    +            assignedCpu += slot.getAllocatedCpu();
    +
    +        }
    +        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
    +
    +        Assert.assertEquals(1, assignedSlots1.size());
    +        Assert.assertEquals(1, nodesIDs1.size());
    +        Assert.assertEquals(2, executors1.size());
    +        Assert.assertEquals(400.0, assignedMemory, 0.001);
    +        Assert.assertEquals(40.0, assignedCpu, 0.001);
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +    }
    +
    +    @Test
    +    public void testResourceLimitation() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder(); // a topology with multiple spouts
    +        builder1.setSpout("wordSpout", new TestWordSpout(), 2).setCPULoad(250.0).setMemoryLoad(1000.0, 200.0);
    +        builder1.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(100.0).setMemoryLoad(500.0, 100.0);
    +        StormTopology stormTopology1 = builder1.createTopology();
    +
    +        Config config = new Config();
    +        config.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 1);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 2, executorMap1, 0);
    +
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology1.getId(), topology1);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignment assignment1 = cluster.getAssignmentById(topology1.getId());
    +        Set<WorkerSlot> assignedSlots1 = assignment1.getSlots();
    +        Set<String> nodesIDs1 = new HashSet<>();
    +        for (WorkerSlot slot : assignedSlots1) {
    +            nodesIDs1.add(slot.getNodeId());
    +        }
    +        Collection<ExecutorDetails> executors1 = assignment1.getExecutors();
    +        List<Double> assignedExecutorMemory = new ArrayList<>();
    +        List<Double> assignedExecutorCpu = new ArrayList<>();
    +        for (ExecutorDetails executor : executors1) {
    +            assignedExecutorMemory.add(topology1.getTotalMemReqTask(executor));
    +            assignedExecutorCpu.add(topology1.getTotalCpuReqTask(executor));
    +        }
    +        Collections.sort(assignedExecutorCpu);
    +        Collections.sort(assignedExecutorMemory);
    +
    +        Map<ExecutorDetails, SupervisorDetails> executorToSupervisor = new HashMap<>();
    +        Map<SupervisorDetails, List<ExecutorDetails>> supervisorToExecutors = new HashMap<>();
    +        Map<Double, Double> cpuAvailableToUsed = new HashMap();
    +        Map<Double, Double> memoryAvailableToUsed = new HashMap();
    +
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : assignment1.getExecutorToSlot().entrySet()) {
    +            executorToSupervisor.put(entry.getKey(), cluster.getSupervisorById(entry.getValue().getNodeId()));
    +        }
    +        for (Map.Entry<ExecutorDetails, SupervisorDetails> entry : executorToSupervisor.entrySet()) {
    +            List<ExecutorDetails> executorsOnSupervisor = supervisorToExecutors.get(entry.getValue());
    +            if (executorsOnSupervisor == null) {
    +                executorsOnSupervisor = new ArrayList<>();
    +                supervisorToExecutors.put(entry.getValue(), executorsOnSupervisor);
    +            }
    +            executorsOnSupervisor.add(entry.getKey());
    +        }
    +        for (Map.Entry<SupervisorDetails, List<ExecutorDetails>> entry : supervisorToExecutors.entrySet()) {
    +            Double supervisorTotalCpu = entry.getKey().getTotalCPU();
    +            Double supervisorTotalMemory = entry.getKey().getTotalMemory();
    +            Double supervisorUsedCpu = 0.0;
    +            Double supervisorUsedMemory = 0.0;
    +            for (ExecutorDetails executor: entry.getValue()) {
    +                supervisorUsedMemory += topology1.getTotalCpuReqTask(executor);
    +                supervisorTotalCpu += topology1.getTotalMemReqTask(executor);
    +            }
    +            cpuAvailableToUsed.put(supervisorTotalCpu, supervisorUsedCpu);
    +            memoryAvailableToUsed.put(supervisorTotalMemory, supervisorUsedMemory);
    +        }
    +        // executor0 resides one one worker (on one), executor1 and executor2 on another worker (on the other node)
    +        Assert.assertEquals(2, assignedSlots1.size());
    +        Assert.assertEquals(2, nodesIDs1.size());
    +        Assert.assertEquals(3, executors1.size());
    +
    +        Assert.assertEquals(100.0, assignedExecutorCpu.get(0), 0.001);
    +        Assert.assertEquals(250.0, assignedExecutorCpu.get(1), 0.001);
    +        Assert.assertEquals(250.0, assignedExecutorCpu.get(2), 0.001);
    +        Assert.assertEquals(600.0, assignedExecutorMemory.get(0), 0.001);
    +        Assert.assertEquals(1200.0, assignedExecutorMemory.get(1), 0.001);
    +        Assert.assertEquals(1200.0, assignedExecutorMemory.get(2), 0.001);
    +
    +        for (Map.Entry<Double, Double> entry : memoryAvailableToUsed.entrySet()) {
    +            Assert.assertTrue(entry.getKey()- entry.getValue() >= 0);
    +        }
    +        for (Map.Entry<Double, Double> entry : cpuAvailableToUsed.entrySet()) {
    +            Assert.assertTrue(entry.getKey()- entry.getValue() >= 0);
    +        }
    +        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topology1.getId()));
    +    }
    +
    +    @Test
    +    public void testScheduleResilience() {
    +        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
    +        Map<String, Number> resourceMap = new HashMap<>();
    +        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 400.0);
    +        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 2000.0);
    +        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, resourceMap);
    +
    +        TopologyBuilder builder1 = new TopologyBuilder();
    +        builder1.setSpout("wordSpout1", new TestWordSpout(), 3);
    +        StormTopology stormTopology1 = builder1.createTopology();
    +        Config config1 = new Config();
    +        config1.putAll(defaultTopologyConf);
    +        Map<ExecutorDetails, String> executorMap1 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 3, 0);
    +        TopologyDetails topology1 = new TopologyDetails("topology1", config1, stormTopology1, 3, executorMap1, 0);
    +
    +        TopologyBuilder builder2 = new TopologyBuilder();
    +        builder2.setSpout("wordSpout2", new TestWordSpout(), 2);
    +        StormTopology stormTopology2 = builder2.createTopology();
    +        Config config2 = new Config();
    +        config2.putAll(defaultTopologyConf);
    +        // memory requirement is large enough so that two executors can not be fully assigned to one node
    +        config2.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 1280.0);
    +        Map<ExecutorDetails, String> executorMap2 = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology1, 2, 0);
    +        TopologyDetails topology2 = new TopologyDetails("topology2", config2, stormTopology2, 2, executorMap2, 0);
    +
    +        // Test1: When a worker fails, RAS does not alter existing assignments on healthy workers
    +        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config1);
    +        ResourceAwareScheduler rs = new ResourceAwareScheduler();
    +        Map<String, TopologyDetails> topoMap = new HashMap<>();
    +        topoMap.put(topology2.getId(), topology2);
    +        Topologies topologies = new Topologies(topoMap);
    +
    +        rs.prepare(config1);
    +        rs.schedule(topologies, cluster);
    +
    +        SchedulerAssignmentImpl assignment = (SchedulerAssignmentImpl)cluster.getAssignmentById(topology2.getId());
    +        // pick a worker to mock as failed
    +        WorkerSlot failedWorker = new ArrayList<WorkerSlot>(assignment.getSlots()).get(0);
    +        Map<ExecutorDetails, WorkerSlot> executorToSlot = assignment.getExecutorToSlot();
    +        List<ExecutorDetails> failedExecutors = new ArrayList<>();
    +        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : executorToSlot.entrySet()) {
    --- End diff --
    
    There is a method in SchedulerAssignmentImpl call getSlotToExecutors that you can use


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-1300] backtype.storm.scheduler.resource...

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on the pull request:

    https://github.com/apache/storm/pull/1232#issuecomment-202448900
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---