You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/09/04 14:23:24 UTC
[storm] branch master updated: [STORM-3685] Detect and prevent
cycles when Topology is submitted. (#3322)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 8399edc [STORM-3685] Detect and prevent cycles when Topology is submitted. (#3322)
8399edc is described below
commit 8399edcfbb06b484f7b06a08d4d75e0a2c8d4e86
Author: Bipin Prasad <bi...@yahoo.com>
AuthorDate: Fri Sep 4 09:23:06 2020 -0500
[STORM-3685] Detect and prevent cycles when Topology is submitted. (#3322)
---
.../src/jvm/org/apache/storm/StormSubmitter.java | 33 +--
.../jvm/org/apache/storm/daemon/StormCommon.java | 13 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 131 +++++++++++
.../test/jvm/org/apache/storm/utils/UtilsTest.java | 254 ++++++++++++++++++++-
4 files changed, 407 insertions(+), 24 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
index f782272..7a6acbe 100644
--- a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+
import org.apache.storm.dependency.DependencyPropertiesParser;
import org.apache.storm.dependency.DependencyUploader;
import org.apache.storm.generated.AlreadyAliveException;
@@ -131,7 +132,7 @@ public class StormSubmitter {
*/
public static boolean pushCredentials(String name, Map<String, Object> topoConf, Map<String, String> credentials, String expectedUser)
throws AuthorizationException, NotAliveException, InvalidTopologyException {
- topoConf = new HashMap(topoConf);
+ topoConf = new HashMap<>(topoConf);
topoConf.putAll(Utils.readCommandLineOpts());
Map<String, Object> conf = Utils.readStormConfig();
conf.putAll(topoConf);
@@ -166,7 +167,7 @@ public class StormSubmitter {
* @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
* @throws AuthorizationException if authorization is failed
- * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+ * @throws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
*/
public static void submitTopology(String name, Map<String, Object> topoConf, StormTopology topology)
throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
@@ -183,7 +184,7 @@ public class StormSubmitter {
* @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
* @throws AuthorizationException if authorization is failed
- * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+ * @throws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
*/
public static void submitTopology(String name, Map<String, Object> topoConf, StormTopology topology, SubmitOptions opts)
throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
@@ -197,11 +198,11 @@ public class StormSubmitter {
* @param topoConf the topology-specific configuration. See {@link Config}.
* @param topology the processing to execute.
* @param opts to manipulate the starting of the topology
- * @param progressListener to track the progress of the jar upload process
+ * @param progressListener to track the progress of the jar upload process {@link ProgressListener}
* @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
* @throws AuthorizationException if authorization is failed
- * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+ * @throws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
*/
@SuppressWarnings("unchecked")
public static void submitTopology(String name, Map<String, Object> topoConf, StormTopology topology, SubmitOptions opts,
@@ -215,7 +216,7 @@ public class StormSubmitter {
*
* @param asUser The user as which this topology should be submitted.
* @throws IllegalArgumentException thrown if configs will yield an unschedulable topology. validateConfs validates confs
- * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+ * @throws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
*/
public static void submitTopologyAs(String name, Map<String, Object> topoConf, StormTopology topology, SubmitOptions opts,
ProgressListener progressListener, String asUser)
@@ -227,13 +228,19 @@ public class StormSubmitter {
if (!Utils.isValidConf(topoConf)) {
throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}
- topoConf = new HashMap(topoConf);
+ topoConf = new HashMap<>(topoConf);
topoConf.putAll(Utils.readCommandLineOpts());
Map<String, Object> conf = Utils.readStormConfig();
conf.putAll(topoConf);
topoConf.putAll(prepareZookeeperAuthentication(conf));
- validateConfs(conf, topology);
+ validateConfs(conf);
+
+ try {
+ Utils.validateCycleFree(topology, name);
+ } catch (InvalidTopologyException ex) {
+ LOG.warn("", ex);
+ }
Map<String, String> passedCreds = new HashMap<>();
if (opts != null) {
@@ -355,7 +362,7 @@ public class StormSubmitter {
/**
* Invoke submitter hook.
- * @thorws SubmitterHookException This is thrown when any Exception occurs during initialization or invocation of registered {@link
+ * @throws SubmitterHookException This is thrown when any Exception occurs during initialization or invocation of registered {@link
* ISubmitterHook}
*/
private static void invokeSubmitterHook(String name, String asUser, Map<String, Object> topoConf, StormTopology topology) {
@@ -407,7 +414,7 @@ public class StormSubmitter {
* @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
* @throws AuthorizationException if authorization is failed
- * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+ * @throws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
*/
public static void submitTopologyWithProgressBar(String name, Map<String, Object> topoConf, StormTopology topology,
SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException,
@@ -444,10 +451,6 @@ public class StormSubmitter {
}
}
- private static String submitJar(Map<String, Object> conf, ProgressListener listener) {
- return submitJar(conf, System.getProperty("storm.jar"), listener);
- }
-
/**
* Submit jar file.
*
@@ -524,7 +527,7 @@ public class StormSubmitter {
}
}
- private static void validateConfs(Map<String, Object> topoConf, StormTopology topology) throws IllegalArgumentException,
+ private static void validateConfs(Map<String, Object> topoConf) throws IllegalArgumentException,
InvalidTopologyException, AuthorizationException {
ConfigValidation.validateTopoConf(topoConf);
Utils.validateTopologyBlobStoreMap(topoConf);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
index 15efada..4d11431 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.Thrift;
@@ -91,7 +92,7 @@ public class StormCommon {
}
}
- private static Set<String> validateIds(Map<String, ? extends Object> componentMap) throws InvalidTopologyException {
+ private static Set<String> validateIds(Map<String, ?> componentMap) throws InvalidTopologyException {
Set<String> keys = componentMap.keySet();
for (String id : keys) {
if (Utils.isSystemId(id)) {
@@ -329,7 +330,7 @@ public class StormCommon {
public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
- Set<String> allIds = new HashSet<String>();
+ Set<String> allIds = new HashSet<>();
allIds.addAll(topology.get_bolts().keySet());
allIds.addAll(topology.get_spouts().keySet());
@@ -374,14 +375,14 @@ public class StormCommon {
List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
if (registerInfo != null) {
- Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
+ Map<String, Integer> classOccurrencesMap = new HashMap<>();
for (Map<String, Object> info : registerInfo) {
String className = (String) info.get(TOPOLOGY_METRICS_CONSUMER_CLASS);
Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT);
Integer maxRetainMetricTuples = ObjectReader.getInt(info.get(
TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES), 100);
Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1);
- Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
+ Map<String, Object> metricsConsumerConf = new HashMap<>();
metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
List<String> whitelist = (List<String>) info.get(
TOPOLOGY_METRICS_CONSUMER_WHITELIST);
@@ -398,7 +399,7 @@ public class StormCommon {
Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
boltInstance, null, phintNum, metricsConsumerConf);
- String id = className;
+ String id;
if (classOccurrencesMap.containsKey(className)) {
// e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
int occurrenceNum = classOccurrencesMap.get(className);
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index aca356a..ae06708 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -59,12 +59,14 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
+import java.util.Stack;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.jar.JarFile;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
@@ -1907,4 +1909,133 @@ public class Utils {
}
}
}
+
+ /**
+ * Create a map of forward edges for bolts in a topology. Note that spouts can be source but not a target in
+ * the edge. The mapping contains ids of spouts and bolts.
+ *
+ * @param topology StormTopology to examine.
+ * @return a map with entry for each SpoutId/BoltId to a set of outbound edges of BoltIds.
+ */
+ private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+ Map<String, Set<String>> edgesOut = new HashMap<>();
+
+ if (topology.get_bolts() != null) {
+ topology.get_bolts().entrySet().forEach(entry -> {
+ if (!Utils.isSystemId(entry.getKey())) {
+ entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+ edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+ });
+ }
+ });
+ }
+ return edgesOut;
+ }
+
+ /**
+ * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+ * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+ *
+ * @param stack used for recursion.
+ * @param edgesOut outbound edge connections, modified when cycle is detected.
+ * @param seen keeps track of component ids that have already been seen.
+ * @param cycles list of cycles seen so far.
+ */
+ private static void findComponentCyclesRecursion(
+ Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+ if (stack.isEmpty()) {
+ return;
+ }
+ String compId1 = stack.peek();
+ if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+ stack.pop();
+ return;
+ }
+ Set<String> children = new HashSet<>(edgesOut.get(compId1));
+ for (String compId2: children) {
+ if (seen.contains(compId2)) {
+ // cycle/diamond detected
+ List<String> possibleCycle = new ArrayList<>();
+ if (compId1.equals(compId2)) {
+ possibleCycle.add(compId2);
+ } else if (edgesOut.get(compId2) != null && edgesOut.get(compId2).contains(compId1)) {
+ possibleCycle.addAll(Arrays.asList(compId1, compId2));
+ } else {
+ List<String> tmp = Collections.list(stack.elements());
+ int prevIdx = tmp.indexOf(compId2);
+ if (prevIdx >= 0) {
+ // cycle (as opposed to diamond)
+ tmp = tmp.subList(prevIdx, tmp.size());
+ tmp.add(compId2);
+ possibleCycle.addAll(tmp);
+ }
+ }
+ if (!possibleCycle.isEmpty()) {
+ cycles.add(possibleCycle);
+ edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+ continue;
+ }
+ }
+ seen.add(compId2);
+ stack.push(compId2);
+ findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+ }
+ stack.pop();
+ }
+
+ /**
+ * Find and return components cycles in the topology graph when starting from spout.
+ * Return a list of cycles. Each cycle may consist of one or more components.
+ * Components that cannot be reached from any of the spouts are ignored.
+ *
+ * @return a List of cycles. Each cycle has a list of component names.
+ *
+ */
+ @VisibleForTesting
+ public static List<List<String>> findComponentCycles(StormTopology topology, String topoId) {
+ List<List<String>> ret = new ArrayList<>();
+ Map<String, Set<String>> edgesOut = getStormTopologyForwardGraph(topology);
+ Set<String> allComponentIds = new HashSet<>();
+ edgesOut.forEach((k, v) -> {
+ allComponentIds.add(k) ;
+ allComponentIds.addAll(v);
+ });
+
+ if (topology.get_spouts_size() == 0) {
+ LOG.error("Topology {} does not contain any spouts, cannot traverse graph to determine cycles", topoId);
+ return ret;
+ }
+
+ Set<String> unreachable = new HashSet<>(edgesOut.keySet());
+ topology.get_spouts().forEach((spoutId, spout) -> {
+ Stack<String> dfsStack = new Stack<>();
+ dfsStack.push(spoutId);
+ Set<String> seen = new HashSet<>();
+ seen.add(spoutId);
+ findComponentCyclesRecursion(dfsStack, edgesOut, seen, ret);
+ unreachable.removeAll(seen);
+ });
+
+ // warning about unreachable components
+ if (!unreachable.isEmpty()) {
+ LOG.warn("Topology {} contains unreachable components \"{}\"", topoId, String.join(",", unreachable));
+ }
+ return ret;
+ }
+
+ /**
+ * Validate that the topology is cycle free. If not, then throw an InvalidTopologyException describing the cycle(s).
+ *
+ * @param topology StormTopology instance to examine.
+ * @param name Name of the topology, used in exception error message.
+ * @throws InvalidTopologyException if there are cycles, with message describing the cycles encountered.
+ */
+ public static void validateCycleFree(StormTopology topology, String name) throws InvalidTopologyException {
+ List<List<String>> cycles = Utils.findComponentCycles(topology, name);
+ if (!cycles.isEmpty()) {
+ String err = String.format("Topology %s contains cycles in components \"%s\"", name,
+ cycles.stream().map(x -> String.join(",", x)).collect(Collectors.joining(" ; ")));
+ throw new WrappedInvalidTopologyException(err);
+ }
+ }
}
diff --git a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
index d42a7b1..27e8987 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
@@ -18,22 +18,34 @@
package org.apache.storm.utils;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.com.google.common.collect.ImmutableList;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
import org.apache.storm.shade.com.google.common.collect.ImmutableSet;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.thrift.transport.TTransportException;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
public class UtilsTest {
+ public static final Logger LOG = LoggerFactory.getLogger(UtilsTest.class);
@Test
public void isZkAuthenticationConfiguredTopologyTest() {
@@ -77,9 +89,7 @@ public class UtilsTest {
}
private void doParseJvmHeapMemByChildOptsTest(String message, List<String> opts, double expected) {
- Assert.assertEquals(
- message,
- Utils.parseJvmHeapMemByChildOpts(opts, 123.0).doubleValue(), expected, 0);
+ Assert.assertEquals(message, Utils.parseJvmHeapMemByChildOpts(opts, 123.0), expected, 0);
}
@Test
@@ -237,4 +247,242 @@ public class UtilsTest {
assertNotNull(found);
assertEquals(key, found.getVersion());
}
+
+ @Test
+ public void testFindComponentCycles() {
+ class CycleDetectionScenario {
+ final String testName;
+ final String testDescription;
+ final StormTopology topology;
+ final int expectedCycles;
+
+ CycleDetectionScenario() {
+ testName = "dummy";
+ testDescription = "dummy test";
+ topology = null;
+ expectedCycles = 0;
+ }
+
+ CycleDetectionScenario(String testName, String testDescription, StormTopology topology, int expectedCycles) {
+ this.testName = testName.replace(' ', '-');
+ this.testDescription = testDescription;
+ this.topology = topology;
+ this.expectedCycles = expectedCycles;
+ }
+
+ public List<CycleDetectionScenario> createTestScenarios() {
+ List<CycleDetectionScenario> ret = new ArrayList<>();
+ int testNo = 0;
+ CycleDetectionScenario s;
+ TopologyBuilder tb;
+
+ // Base case
+ {
+ testNo++;
+ tb = new TopologyBuilder();
+ tb.setSpout("spout1", new TestWordSpout(), 10);
+ tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+ tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+ tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+ tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+ tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+ tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+ s = new CycleDetectionScenario(String.format("(%d) Base", testNo),
+ "Three level component hierarchy with no loops",
+ tb.createTopology(),
+ 0);
+ ret.add(s);
+ }
+
+ // single loop with one bolt
+ {
+ testNo++;
+ tb = new TopologyBuilder();
+ tb.setSpout("spout1", new TestWordSpout(), 10);
+ tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+ tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+ tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+ tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+ tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+ tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+ // loop bolt 3 (also connect bolt3 to spout 1)
+ tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt3");
+ ret.add(new CycleDetectionScenario(String.format("(%d) One Loop", testNo),
+ "Three level component hierarchy with 1 cycle in bolt3",
+ tb.createTopology(),
+ 1));
+ }
+
+ // single loop with three bolts
+ {
+ testNo++;
+ tb = new TopologyBuilder();
+ tb.setSpout("spout1", new TestWordSpout(), 10);
+ tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+ tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+ tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+ tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+ tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+ tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+ // loop bolt 3 -> 4 -> 5 -> 3 (also connect bolt3 to spout1)
+ tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt5");
+ tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("bolt3");
+ tb.setBolt("bolt5", new TestWordCounter(), 10).shuffleGrouping("bolt4");
+ ret.add(new CycleDetectionScenario(String.format("(%d) One Loop", testNo),
+ "Four level component hierarchy with 1 cycle in bolt3,bolt4,bolt5",
+ tb.createTopology(),
+ 1));
+ }
+
+ // two loops with three bolts, and one bolt
+ {
+ testNo++;
+ tb = new TopologyBuilder();
+ tb.setSpout("spout1", new TestWordSpout(), 10);
+ tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+ tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+ tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+ tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+ tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+ tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+ // loop bolt 3 -> 4 -> 5 -> 3 (also connect bolt3 to spout1)
+ tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt5");
+ tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("bolt3");
+ tb.setBolt("bolt5", new TestWordCounter(), 10).shuffleGrouping("bolt4");
+ // loop bolt 6 (also connect bolt6 to spout 1)
+ tb.setBolt("bolt6", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt6");
+ ret.add(new CycleDetectionScenario(String.format("(%d) Two Loops", testNo),
+ "Four level component hierarchy with 2 cycles in bolt3,bolt4,bolt5 and bolt6",
+ tb.createTopology(),
+ 2));
+ }
+
+ // complex cycle
+ {
+ // (S1 -> B1 -> B2 -> B3 -> B4 <- S2), (B4 -> B3), (B4 -> B1)
+ testNo++;
+ tb = new TopologyBuilder();
+ tb.setSpout("spout1", new TestWordSpout(), 10);
+ tb.setSpout("spout2", new TestWordSpout(), 10);
+ tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt4");
+ tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+ tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("bolt2").shuffleGrouping("bolt4");
+ tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("bolt3").shuffleGrouping("spout2");
+ ret.add(new CycleDetectionScenario(String.format("(%d) Complex Loops#1", testNo),
+ "Complex cycle (S1 -> B1 -> B2 -> B3 -> B4 <- S2), (B4 -> B3), (B4 -> B1)",
+ tb.createTopology(),
+ 1));
+ }
+
+ // another complex
+ {
+ testNo++;
+ tb = new TopologyBuilder();
+ tb.setSpout("spout1", new TestWordSpout(), 10);
+ tb.setSpout("spout2", new TestWordSpout(), 10);
+ tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt4").shuffleGrouping("bolt2");
+ tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+ tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("bolt2").shuffleGrouping("bolt4");
+ tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("spout2");
+ ret.add(new CycleDetectionScenario(String.format("(%d) Complex Loops#2", testNo),
+ "Complex cycle 2 (S1 -> B1 <-> B2 -> B3 ), (S2 -> B4 -> B3), (B4 -> B1)",
+ tb.createTopology(),
+ 1));
+ }
+
+ // no spouts but with loops; the loops wont be detected
+ {
+ testNo++;
+ tb = new TopologyBuilder();
+ tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("bolt4").shuffleGrouping("bolt2");
+ tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+ tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("bolt2").shuffleGrouping("bolt4");
+ tb.setBolt("bolt4", new TestWordCounter(), 10);
+ ret.add(new CycleDetectionScenario(String.format("(%d) No spout complex loops", testNo),
+ "No Spouts, but with cycles (B1 <-> B2 -> B3 ), (B4 -> B3), (B4 -> B1)",
+ tb.createTopology(),
+ 0));
+ }
+
+ // now some randomly generated topologies
+ int maxSpouts = 10;
+ int maxBolts = 30;
+ int randomTopoCnt = 100;
+ for (int iRandTest = 0; iRandTest < randomTopoCnt; iRandTest++) {
+ testNo++;
+ tb = new TopologyBuilder();
+
+ // topology component and connection counts
+ int spoutCnt = ThreadLocalRandom.current().nextInt(0, maxSpouts) + 1;
+ int boltCnt = ThreadLocalRandom.current().nextInt(0, maxBolts) + 1;
+ int spoutToBoltConnectionCnt = ThreadLocalRandom.current().nextInt(spoutCnt * boltCnt) + 1;
+ int boltToBoltConnectionCnt = ThreadLocalRandom.current().nextInt(boltCnt * boltCnt) + 1;
+
+ Map<Integer, BoltDeclarer> boltDeclarers = new HashMap<>();
+ for (int iSpout = 0 ; iSpout < spoutCnt ; iSpout++) {
+ tb.setSpout("spout" + iSpout, new TestWordSpout(), 10);
+ }
+ for (int iBolt = 0 ; iBolt < boltCnt ; iBolt++) {
+ boltDeclarers.put(iBolt, tb.setBolt("bolt" + iBolt, new TestWordCounter(), 10));
+ }
+ // spout to bolt connections
+ for (int i = 0 ; i < spoutToBoltConnectionCnt ; i++) {
+ int iSpout = ThreadLocalRandom.current().nextInt(0, spoutCnt);
+ int iBolt = ThreadLocalRandom.current().nextInt(0, boltCnt);
+ boltDeclarers.get(iBolt).shuffleGrouping("spout" + iSpout);
+ }
+ // bolt to bolt connections
+ for (int i = 0 ; i < boltToBoltConnectionCnt ; i++) {
+ int iBolt1 = ThreadLocalRandom.current().nextInt(0, boltCnt);
+ int iBolt2 = ThreadLocalRandom.current().nextInt(0, boltCnt);
+ boltDeclarers.get(iBolt2).shuffleGrouping("bolt" + iBolt1);
+ }
+ ret.add(new CycleDetectionScenario(String.format("(%d) Random Topo#%d", testNo, iRandTest),
+ String.format("Random topology #%d, spouts=%d, bolts=%d, connections: fromSpouts=%d/fromBolts=%d",
+ iRandTest, spoutCnt, boltCnt, spoutToBoltConnectionCnt, boltToBoltConnectionCnt),
+ tb.createTopology(),
+ -1));
+ }
+
+ return ret;
+ }
+ }
+ List<String> testFailures = new ArrayList<>();
+
+ new CycleDetectionScenario().createTestScenarios().forEach(x -> {
+ LOG.info("==================== Running Test Scenario: {} =======================", x.testName);
+ LOG.info("{}: {}", x.testName, x.testDescription);
+
+ List<List<String>> loops = Utils.findComponentCycles(x.topology, x.testName);
+ if (x.expectedCycles >= 0) {
+ if (!loops.isEmpty()) {
+ LOG.info("{} detected loops are \"{}\"", x.testName,
+ loops.stream()
+ .map(y -> String.join(",", y))
+ .collect(Collectors.joining(" ; "))
+ );
+ }
+ if (loops.size() != x.expectedCycles) {
+ testFailures.add(
+ String.format("Test \"%s\" failed, detected cycles=%d does not match expected=%d for \"%s\"",
+ x.testName, loops.size(), x.expectedCycles, x.testDescription));
+ if (!loops.isEmpty()) {
+ testFailures.add(
+ String.format("\t\tdetected loops are \"%s\"",
+ loops.stream()
+ .map(y -> String.join(",", y))
+ .collect(Collectors.joining(" ; "))
+ )
+ );
+ }
+ }
+ } else {
+ // these are random topologies, with indeterminate number of loops
+ LOG.info("{} detected loop count is \"{}\"", x.testName, loops.size());
+ }
+ });
+ if (!testFailures.isEmpty()) {
+ fail(String.join("\n", testFailures));
+ }
+ }
}