You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:33 UTC
[05/23] storm git commit: adding checkpointing
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
index 73a8c73..b41b039 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -138,7 +138,6 @@ public class TestResourceAwareScheduler {
topoMap.put(topo6.getId(), topo6);
topologies = new Topologies(topoMap);
- rs = new ResourceAwareScheduler();
rs.prepare(config);
rs.schedule(topologies, cluster);
@@ -293,7 +292,7 @@ public class TestResourceAwareScheduler {
int fullyScheduled = 0;
for (TopologyDetails topo : topoMap.values()) {
- if(cluster.getStatusMap().get(topo.getId()).equals("Fully Scheduled")) {
+ if (TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))) {
fullyScheduled++;
}
}
@@ -302,4 +301,401 @@ public class TestResourceAwareScheduler {
Assert.assertEquals("# of topologies running", 1, rs.getUser("jerry").getTopologiesRunning().size());
Assert.assertEquals("# of topologies schedule pending", 0, rs.getUser("jerry").getTopologiesPending().size());
}
+
+ /**
+ * The resources in the cluster is limited. In the first round of scheduling, all resources in the cluster is used.
+ * User jerry submits another toplogy. Since user jerry has has his resource guarantees satisfied, and user bobby
+ * has exceeded his resource guarantee, topo-3 from user bobby should be eviced.
+ */
+ @Test
+ public void testEviction() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+ Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+ resourceUserPool.put("jerry", new HashMap<String, Number>());
+ resourceUserPool.get("jerry").put("cpu", 200.0);
+ resourceUserPool.get("jerry").put("memory", 2000.0);
+
+ resourceUserPool.put("bobby", new HashMap<String, Number>());
+ resourceUserPool.get("bobby").put("cpu", 100.0);
+ resourceUserPool.get("bobby").put("memory", 1000.0);
+
+ resourceUserPool.put("derek", new HashMap<String, Number>());
+ resourceUserPool.get("derek").put("cpu", 200.0);
+ resourceUserPool.get("derek").put("memory", 2000.0);
+
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo1.getId(), topo1);
+ topoMap.put(topo2.getId(), topo2);
+ topoMap.put(topo3.getId(), topo3);
+ topoMap.put(topo4.getId(), topo4);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+
+ //user jerry submits another topology
+ topoMap.put(topo6.getId(), topo6);
+ topologies = new Topologies(topoMap);
+
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+ Assert.assertFalse("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ }
+
+ @Test
+ public void TestEvictMultipleTopologies() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+ Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+ resourceUserPool.put("jerry", new HashMap<String, Number>());
+ resourceUserPool.get("jerry").put("cpu", 200.0);
+ resourceUserPool.get("jerry").put("memory", 2000.0);
+
+ resourceUserPool.put("derek", new HashMap<String, Number>());
+ resourceUserPool.get("derek").put("cpu", 100.0);
+ resourceUserPool.get("derek").put("memory", 1000.0);
+
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 2, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo2.getId(), topo2);
+ topoMap.put(topo3.getId(), topo3);
+ topoMap.put(topo4.getId(), topo4);
+ topoMap.put(topo5.getId(), topo5);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+
+ //user jerry submits another topology
+ topoMap.put(topo1.getId(), topo1);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+ Assert.assertFalse("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
+ }
+
+ /**
+ * Eviction order:
+ * topo-3: since user bobby don't have any resource guarantees and topo-3 is the lowest priority for user bobby
+ * topo-2: since user bobby don't have any resource guarantees and topo-2 is the next lowest priority for user bobby
+ * topo-5: since user derek has exceeded his resource guarantee while user jerry has not. topo-5 and topo-4 has the same priority
+ * but topo-4 was submitted earlier thus we choose that one to evict
+ */
+ @Test
+ public void TestEvictMultipleTopologiesFromMultipleUsersInCorrectOrder() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+ Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+ resourceUserPool.put("jerry", new HashMap<String, Number>());
+ resourceUserPool.get("jerry").put("cpu", 300.0);
+ resourceUserPool.get("jerry").put("memory", 3000.0);
+
+ resourceUserPool.put("derek", new HashMap<String, Number>());
+ resourceUserPool.get("derek").put("cpu", 100.0);
+ resourceUserPool.get("derek").put("memory", 1000.0);
+
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+ TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+ TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 15, 30);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo2.getId(), topo2);
+ topoMap.put(topo3.getId(), topo3);
+ topoMap.put(topo4.getId(), topo4);
+ topoMap.put(topo5.getId(), topo5);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+
+ //user jerry submits another topology
+ topoMap.put(topo1.getId(), topo1);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+ Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+ Assert.assertEquals("correct topology to evict", rs.getUser("bobby").getTopologiesAttempted().iterator().next().getName(), "topo-3");
+
+ topoMap.put(topo6.getId(), topo6);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+ Assert.assertFalse(TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
+
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
+
+ topoMap.put(topo7.getId(), topo7);
+ topologies = new Topologies(topoMap);
+ rs.schedule(topologies, cluster);
+
+ for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 3, rs.getUser("jerry").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+ Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ for (TopologyDetails topo : rs.getUser("derek").getTopologiesAttempted()) {
+ Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+ Assert.assertEquals("# of attempted topologies", 1, rs.getUser("derek").getTopologiesAttempted().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+ Assert.assertEquals("correct topology to evict", rs.getUser("derek").getTopologiesAttempted().iterator().next().getName(), "topo-4");
+
+ for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+ Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+ }
+ Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
+ Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+ Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+ Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
+
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
+ Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index d4177c3..7721300 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -22,6 +22,7 @@ import backtype.storm.Config;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.TopologySummary;
import backtype.storm.scheduler.ExecutorDetails;
import backtype.storm.scheduler.INimbus;
import backtype.storm.scheduler.IScheduler;
@@ -53,6 +54,11 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class TestUtilsForResourceAwareScheduler {
private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
@@ -131,6 +137,7 @@ public class TestUtilsForResourceAwareScheduler {
conf.putAll(config);
conf.put(Config.TOPOLOGY_PRIORITY, priority);
conf.put(Config.TOPOLOGY_NAME, name);
+ conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
StormTopology topology = buildTopology(numSpout,numBolt, spoutParallelism, boltParallelism);
TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
0,
@@ -262,4 +269,25 @@ public class TestUtilsForResourceAwareScheduler {
return null;
}
}
+
+ private static boolean isContain(String source, String subItem){
+ String pattern = "\\b"+subItem+"\\b";
+ Pattern p=Pattern.compile(pattern, Pattern.CASE_INSENSITIVE);
+ Matcher m=p.matcher(source);
+ return m.find();
+ }
+
+ public static boolean assertStatusSuccess(String status) {
+ return isContain(status, "fully") && isContain(status, "scheduled") && !isContain(status, "unsuccessful");
+ }
+
+ public static TopologyDetails findTopologyInSetFromName(String topoName, Set<TopologyDetails> set) {
+ TopologyDetails ret = null;
+ for(TopologyDetails entry : set) {
+ if(entry.getName().equals(topoName)) {
+ ret = entry;
+ }
+ }
+ return ret;
+ }
}