You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by je...@apache.org on 2016/04/14 17:41:50 UTC

storm git commit: Merge branch 'STORM-1681' of https://github.com/jerrypeng/storm

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 374934c16 -> b67f42a08


Merge branch 'STORM-1681' of https://github.com/jerrypeng/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b67f42a0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b67f42a0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b67f42a0

Branch: refs/heads/1.x-branch
Commit: b67f42a088e87e70d764247fcb0ed46c222d98b8
Parents: 374934c
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Apr 14 10:08:28 2016 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Apr 14 10:30:31 2016 -0500

----------------------------------------------------------------------
 .../DefaultResourceAwareStrategy.java           |  9 +--
 .../resource/TestResourceAwareScheduler.java    | 61 ++++++++++++++++++++
 2 files changed, 66 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b67f42a0/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 9a12f80..c4ce7ef 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -316,23 +316,24 @@ public class DefaultResourceAwareStrategy implements IStrategy {
     private Queue<Component> bfs(TopologyDetails td, List<Component> spouts) {
         // Since queue is a interface
         Queue<Component> ordered__Component_list = new LinkedList<Component>();
-        HashMap<String, Component> visited = new HashMap<>();
+        HashSet<String> visited = new HashSet<>();
 
         /* start from each spout that is not visited, each does a breadth-first traverse */
         for (Component spout : spouts) {
-            if (!visited.containsKey(spout.id)) {
+            if (!visited.contains(spout.id)) {
                 Queue<Component> queue = new LinkedList<>();
+                visited.add(spout.id);
                 queue.offer(spout);
                 while (!queue.isEmpty()) {
                     Component comp = queue.poll();
-                    visited.put(comp.id, comp);
                     ordered__Component_list.add(comp);
                     List<String> neighbors = new ArrayList<>();
                     neighbors.addAll(comp.children);
                     neighbors.addAll(comp.parents);
                     for (String nbID : neighbors) {
-                        if (!visited.containsKey(nbID)) {
+                        if (!visited.contains(nbID)) {
                             Component child = td.getComponents().get(nbID);
+                            visited.add(nbID);
                             queue.offer(child);
                         }
                     }

http://git-wip-us.apache.org/repos/asf/storm/blob/b67f42a0/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index d1c261b..9f8b980 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -29,6 +29,10 @@ import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.SpoutDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.validation.ConfigValidation;
@@ -1430,4 +1434,61 @@ public class TestResourceAwareScheduler {
         Assert.assertTrue("Topo-3 scheduled?", cluster.getAssignmentById(topo3.getId()) != null);
         Assert.assertEquals("Topo-3 all executors scheduled?", 3, cluster.getAssignmentById(topo3.getId()).getExecutorToSlot().size());
     }
+
+    /**
+     * Test multiple spouts and cyclic topologies
+     */
+    @Test
+    public void TestMultipleSpoutsAndCyclicTopologies() {
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        SpoutDeclarer s1 = builder.setSpout("spout-1", new TestUtilsForResourceAwareScheduler.TestSpout(),
+                5);
+        SpoutDeclarer s2 = builder.setSpout("spout-2", new TestUtilsForResourceAwareScheduler.TestSpout(),
+                5);
+        BoltDeclarer b1 = builder.setBolt("bolt-1", new TestUtilsForResourceAwareScheduler.TestBolt(),
+                5).shuffleGrouping("spout-1").shuffleGrouping("bolt-3");
+        BoltDeclarer b2 = builder.setBolt("bolt-2", new TestUtilsForResourceAwareScheduler.TestBolt(),
+                5).shuffleGrouping("bolt-1");
+        BoltDeclarer b3 = builder.setBolt("bolt-3", new TestUtilsForResourceAwareScheduler.TestBolt(),
+                5).shuffleGrouping("bolt-2").shuffleGrouping("spout-2");
+
+        INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+        Map<String, Number> resourceMap = new HashMap<String, Number>();
+        resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+        resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+        Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(25, 1, resourceMap);
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+        config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+        config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+
+        StormTopology stormTopology = builder.createTopology();
+        TopologyDetails topo = new TopologyDetails("topo-1", config, stormTopology,
+                0,
+                TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology, 5, 5), 0);
+
+        Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+        config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+        Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+        topoMap.put(topo.getId(), topo);
+
+        Topologies topologies = new Topologies(topoMap);
+
+        ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+        rs.prepare(config);
+        rs.schedule(topologies, cluster);
+
+        Assert.assertTrue("Topo scheduled?", cluster.getAssignmentById(topo.getId()) != null);
+        Assert.assertEquals("Topo all executors scheduled?", 25, cluster.getAssignmentById(topo.getId()).getExecutorToSlot().size());
+    }
 }