You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by je...@apache.org on 2016/04/14 17:41:50 UTC
storm git commit: Merge branch 'STORM-1681' of
https://github.com/jerrypeng/storm
Repository: storm
Updated Branches:
refs/heads/1.x-branch 374934c16 -> b67f42a08
Merge branch 'STORM-1681' of https://github.com/jerrypeng/storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b67f42a0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b67f42a0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b67f42a0
Branch: refs/heads/1.x-branch
Commit: b67f42a088e87e70d764247fcb0ed46c222d98b8
Parents: 374934c
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Thu Apr 14 10:08:28 2016 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Thu Apr 14 10:30:31 2016 -0500
----------------------------------------------------------------------
.../DefaultResourceAwareStrategy.java | 9 +--
.../resource/TestResourceAwareScheduler.java | 61 ++++++++++++++++++++
2 files changed, 66 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b67f42a0/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 9a12f80..c4ce7ef 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -316,23 +316,24 @@ public class DefaultResourceAwareStrategy implements IStrategy {
private Queue<Component> bfs(TopologyDetails td, List<Component> spouts) {
// Since queue is a interface
Queue<Component> ordered__Component_list = new LinkedList<Component>();
- HashMap<String, Component> visited = new HashMap<>();
+ HashSet<String> visited = new HashSet<>();
/* start from each spout that is not visited, each does a breadth-first traverse */
for (Component spout : spouts) {
- if (!visited.containsKey(spout.id)) {
+ if (!visited.contains(spout.id)) {
Queue<Component> queue = new LinkedList<>();
+ visited.add(spout.id);
queue.offer(spout);
while (!queue.isEmpty()) {
Component comp = queue.poll();
- visited.put(comp.id, comp);
ordered__Component_list.add(comp);
List<String> neighbors = new ArrayList<>();
neighbors.addAll(comp.children);
neighbors.addAll(comp.parents);
for (String nbID : neighbors) {
- if (!visited.containsKey(nbID)) {
+ if (!visited.contains(nbID)) {
Component child = td.getComponents().get(nbID);
+ visited.add(nbID);
queue.offer(child);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b67f42a0/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index d1c261b..9f8b980 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -29,6 +29,10 @@ import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.SpoutDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import org.apache.storm.validation.ConfigValidation;
@@ -1430,4 +1434,61 @@ public class TestResourceAwareScheduler {
Assert.assertTrue("Topo-3 scheduled?", cluster.getAssignmentById(topo3.getId()) != null);
Assert.assertEquals("Topo-3 all executors scheduled?", 3, cluster.getAssignmentById(topo3.getId()).getExecutorToSlot().size());
}
+
+ /**
+ * Test multiple spouts and cyclic topologies
+ */
+ @Test
+ public void TestMultipleSpoutsAndCyclicTopologies() {
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ SpoutDeclarer s1 = builder.setSpout("spout-1", new TestUtilsForResourceAwareScheduler.TestSpout(),
+ 5);
+ SpoutDeclarer s2 = builder.setSpout("spout-2", new TestUtilsForResourceAwareScheduler.TestSpout(),
+ 5);
+ BoltDeclarer b1 = builder.setBolt("bolt-1", new TestUtilsForResourceAwareScheduler.TestBolt(),
+ 5).shuffleGrouping("spout-1").shuffleGrouping("bolt-3");
+ BoltDeclarer b2 = builder.setBolt("bolt-2", new TestUtilsForResourceAwareScheduler.TestBolt(),
+ 5).shuffleGrouping("bolt-1");
+ BoltDeclarer b3 = builder.setBolt("bolt-3", new TestUtilsForResourceAwareScheduler.TestBolt(),
+ 5).shuffleGrouping("bolt-2").shuffleGrouping("spout-2");
+
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(25, 1, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+
+ StormTopology stormTopology = builder.createTopology();
+ TopologyDetails topo = new TopologyDetails("topo-1", config, stormTopology,
+ 0,
+ TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology, 5, 5), 0);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo.getId(), topo);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ Assert.assertTrue("Topo scheduled?", cluster.getAssignmentById(topo.getId()) != null);
+ Assert.assertEquals("Topo all executors scheduled?", 25, cluster.getAssignmentById(topo.getId()).getExecutorToSlot().size());
+ }
}