You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:33 UTC

[05/23] storm git commit: adding checkpointing

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
index 73a8c73..b41b039 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -138,7 +138,6 @@ public class TestResourceAwareScheduler {
         topoMap.put(topo6.getId(), topo6);
 
         topologies = new Topologies(topoMap);
-        rs = new ResourceAwareScheduler();
         rs.prepare(config);
         rs.schedule(topologies, cluster);
 
@@ -293,7 +292,7 @@ public class TestResourceAwareScheduler {
 
         int fullyScheduled = 0;
         for (TopologyDetails topo : topoMap.values()) {
-            if(cluster.getStatusMap().get(topo.getId()).equals("Fully Scheduled")) {
+            if (TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId()))) {
                 fullyScheduled++;
             }
         }
@@ -302,4 +301,401 @@ public class TestResourceAwareScheduler {
         Assert.assertEquals("# of topologies running", 1, rs.getUser("jerry").getTopologiesRunning().size());
         Assert.assertEquals("# of topologies schedule pending", 0, rs.getUser("jerry").getTopologiesPending().size());
     }
+
+    /**
+     * The resources in the cluster is limited. In the first round of scheduling, all resources in the cluster is used.
+     * User jerry submits another toplogy.  Since user jerry has has his resource guarantees satisfied, and user bobby
+     * has exceeded his resource guarantee, topo-3 from user bobby should be eviced.
+     */
+    @Test
+    public void testEviction() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+        resourceUserPool.put("jerry", new HashMap<String, Number>());
+        resourceUserPool.get("jerry").put("cpu", 200.0);
+        resourceUserPool.get("jerry").put("memory", 2000.0);
+
+        resourceUserPool.put("bobby", new HashMap<String, Number>());
+        resourceUserPool.get("bobby").put("cpu", 100.0);
+        resourceUserPool.get("bobby").put("memory", 1000.0);
+
+        resourceUserPool.put("derek", new HashMap<String, Number>());
+        resourceUserPool.get("derek").put("cpu", 200.0);
+        resourceUserPool.get("derek").put("memory", 2000.0);
+
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo1.getId(), topo1);
+        topoMap.put(topo2.getId(), topo2);
+        topoMap.put(topo3.getId(), topo3);
+        topoMap.put(topo4.getId(), topo4);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+
+        //user jerry submits another topology
+        topoMap.put(topo6.getId(), topo6);
+        topologies = new Topologies(topoMap);
+
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+            Assert.assertFalse("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+    }
+
+    @Test
+    public void TestEvictMultipleTopologies() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+        resourceUserPool.put("jerry", new HashMap<String, Number>());
+        resourceUserPool.get("jerry").put("cpu", 200.0);
+        resourceUserPool.get("jerry").put("memory", 2000.0);
+
+        resourceUserPool.put("derek", new HashMap<String, Number>());
+        resourceUserPool.get("derek").put("cpu", 100.0);
+        resourceUserPool.get("derek").put("memory", 1000.0);
+
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 2, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo2.getId(), topo2);
+        topoMap.put(topo3.getId(), topo3);
+        topoMap.put(topo4.getId(), topo4);
+        topoMap.put(topo5.getId(), topo5);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+
+        //user jerry submits another topology
+        topoMap.put(topo1.getId(), topo1);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+            Assert.assertFalse("correct topology to evict", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
+    }
+
+    /**
+     * Eviction order:
+     * topo-3: since user bobby don't have any resource guarantees and topo-3 is the lowest priority for user bobby
+     * topo-2: since user bobby don't have any resource guarantees and topo-2 is the next lowest priority for user bobby
+     * topo-5: since user derek has exceeded his resource guarantee while user jerry has not.  topo-5 and topo-4 has the same priority
+     * but topo-4 was submitted earlier thus we choose that one to evict
+     */
+    @Test
+    public void TestEvictMultipleTopologiesFromMultipleUsersInCorrectOrder() {
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+        Map<String, Map<String, Number>> resourceUserPool = new HashMap<String, Map<String, Number>>();
+        resourceUserPool.put("jerry", new HashMap<String, Number>());
+        resourceUserPool.get("jerry").put("cpu", 300.0);
+        resourceUserPool.get("jerry").put("memory", 3000.0);
+
+        resourceUserPool.put("derek", new HashMap<String, Number>());
+        resourceUserPool.get("derek").put("cpu", 100.0);
+        resourceUserPool.get("derek").put("memory", 1000.0);
+
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS, resourceUserPool);
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+        TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo6 = TestUtilsForResourceAwareScheduler.getTopology("topo-6", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo7 = TestUtilsForResourceAwareScheduler.getTopology("topo-7", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "bobby");
+
+        TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 10);
+        TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 20);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "derek");
+
+        TopologyDetails topo4 = TestUtilsForResourceAwareScheduler.getTopology("topo-4", config, 1, 0, 1, 0, Time.currentTimeSecs() - 2, 30);
+        TopologyDetails topo5 = TestUtilsForResourceAwareScheduler.getTopology("topo-5", config, 1, 0, 1, 0, Time.currentTimeSecs() - 15, 30);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo2.getId(), topo2);
+        topoMap.put(topo3.getId(), topo3);
+        topoMap.put(topo4.getId(), topo4);
+        topoMap.put(topo5.getId(), topo5);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("bobby").getTopologiesAttempted().size());
+
+        //user jerry submits another topology
+        topoMap.put(topo1.getId(), topo1);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+            Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of attempted topologies", 1, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("bobby").getTopologiesRunning().size());
+        Assert.assertEquals("correct topology to evict", rs.getUser("bobby").getTopologiesAttempted().iterator().next().getName(), "topo-3");
+
+        topoMap.put(topo6.getId(), topo6);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 2, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+            Assert.assertFalse(TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
+
+        Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
+        Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
+
+        topoMap.put(topo7.getId(), topo7);
+        topologies = new Topologies(topoMap);
+        rs.schedule(topologies, cluster);
+
+        for (TopologyDetails topo : rs.getUser("jerry").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 3, rs.getUser("jerry").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("jerry").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 0, rs.getUser("jerry").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("jerry").getTopologiesInvalid().size());
+
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesRunning()) {
+            Assert.assertTrue("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        for (TopologyDetails topo : rs.getUser("derek").getTopologiesAttempted()) {
+            Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of running topologies", 1, rs.getUser("derek").getTopologiesRunning().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("derek").getTopologiesPending().size());
+        Assert.assertEquals("# of attempted topologies", 1, rs.getUser("derek").getTopologiesAttempted().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("derek").getTopologiesInvalid().size());
+        Assert.assertEquals("correct topology to evict", rs.getUser("derek").getTopologiesAttempted().iterator().next().getName(), "topo-4");
+
+        for (TopologyDetails topo : rs.getUser("bobby").getTopologiesAttempted()) {
+            Assert.assertFalse("Assert scheduling topology success", TestUtilsForResourceAwareScheduler.assertStatusSuccess(cluster.getStatusMap().get(topo.getId())));
+        }
+        Assert.assertEquals("# of attempted topologies", 2, rs.getUser("bobby").getTopologiesAttempted().size());
+        Assert.assertEquals("# of pending topologies", 0, rs.getUser("bobby").getTopologiesPending().size());
+        Assert.assertEquals("# of invalid topologies", 0, rs.getUser("bobby").getTopologiesInvalid().size());
+        Assert.assertEquals("# of running topologies", 0, rs.getUser("bobby").getTopologiesRunning().size());
+
+        Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-2", rs.getUser("bobby").getTopologiesAttempted()) != null);
+        Assert.assertTrue("correct topology to evict", TestUtilsForResourceAwareScheduler.findTopologyInSetFromName("topo-3", rs.getUser("bobby").getTopologiesAttempted()) != null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3c864c/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
index d4177c3..7721300 100644
--- a/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
+++ b/storm-core/test/jvm/backtype/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java
@@ -22,6 +22,7 @@ import backtype.storm.Config;
 import backtype.storm.generated.Bolt;
 import backtype.storm.generated.SpoutSpec;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.TopologySummary;
 import backtype.storm.scheduler.ExecutorDetails;
 import backtype.storm.scheduler.INimbus;
 import backtype.storm.scheduler.IScheduler;
@@ -53,6 +54,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class TestUtilsForResourceAwareScheduler {
     private static final Logger LOG = LoggerFactory.getLogger(TestUtilsForResourceAwareScheduler.class);
@@ -131,6 +137,7 @@ public class TestUtilsForResourceAwareScheduler {
         conf.putAll(config);
         conf.put(Config.TOPOLOGY_PRIORITY, priority);
         conf.put(Config.TOPOLOGY_NAME, name);
+        conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
         StormTopology topology = buildTopology(numSpout,numBolt, spoutParallelism, boltParallelism);
         TopologyDetails topo = new TopologyDetails(name + "-" + launchTime, conf, topology,
                 0,
@@ -262,4 +269,25 @@ public class TestUtilsForResourceAwareScheduler {
             return null;
         }
     }
+
+    private static boolean isContain(String source, String subItem){
+        String pattern = "\\b"+subItem+"\\b";
+        Pattern p=Pattern.compile(pattern, Pattern.CASE_INSENSITIVE);
+        Matcher m=p.matcher(source);
+        return m.find();
+    }
+
+    public static boolean assertStatusSuccess(String status) {
+        return isContain(status, "fully") && isContain(status, "scheduled") && !isContain(status, "unsuccessful");
+    }
+
+    public static TopologyDetails findTopologyInSetFromName(String topoName, Set<TopologyDetails> set) {
+        TopologyDetails ret = null;
+        for(TopologyDetails entry : set) {
+            if(entry.getName().equals(topoName)) {
+                ret = entry;
+            }
+        }
+        return ret;
+    }
 }