You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/09/04 14:23:24 UTC

[storm] branch master updated: [STORM-3685] Detect and prevent cycles when Topology is submitted. (#3322)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 8399edc  [STORM-3685] Detect and prevent cycles when Topology is submitted. (#3322)
8399edc is described below

commit 8399edcfbb06b484f7b06a08d4d75e0a2c8d4e86
Author: Bipin Prasad <bi...@yahoo.com>
AuthorDate: Fri Sep 4 09:23:06 2020 -0500

    [STORM-3685] Detect and prevent cycles when Topology is submitted. (#3322)
---
 .../src/jvm/org/apache/storm/StormSubmitter.java   |  33 +--
 .../jvm/org/apache/storm/daemon/StormCommon.java   |  13 +-
 .../src/jvm/org/apache/storm/utils/Utils.java      | 131 +++++++++++
 .../test/jvm/org/apache/storm/utils/UtilsTest.java | 254 ++++++++++++++++++++-
 4 files changed, 407 insertions(+), 24 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
index f782272..7a6acbe 100644
--- a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
 import org.apache.storm.dependency.DependencyPropertiesParser;
 import org.apache.storm.dependency.DependencyUploader;
 import org.apache.storm.generated.AlreadyAliveException;
@@ -131,7 +132,7 @@ public class StormSubmitter {
      */
     public static boolean pushCredentials(String name, Map<String, Object> topoConf, Map<String, String> credentials, String expectedUser)
         throws AuthorizationException, NotAliveException, InvalidTopologyException {
-        topoConf = new HashMap(topoConf);
+        topoConf = new HashMap<>(topoConf);
         topoConf.putAll(Utils.readCommandLineOpts());
         Map<String, Object> conf = Utils.readStormConfig();
         conf.putAll(topoConf);
@@ -166,7 +167,7 @@ public class StormSubmitter {
      * @throws AlreadyAliveException    if a topology with this name is already running
      * @throws InvalidTopologyException if an invalid topology was submitted
      * @throws AuthorizationException   if authorization is failed
-     * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+     * @throws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
      */
     public static void submitTopology(String name, Map<String, Object> topoConf, StormTopology topology)
         throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
@@ -183,7 +184,7 @@ public class StormSubmitter {
      * @throws AlreadyAliveException    if a topology with this name is already running
      * @throws InvalidTopologyException if an invalid topology was submitted
      * @throws AuthorizationException   if authorization is failed
-     * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+     * @throws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
      */
     public static void submitTopology(String name, Map<String, Object> topoConf, StormTopology topology, SubmitOptions opts)
         throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
@@ -197,11 +198,11 @@ public class StormSubmitter {
      * @param topoConf         the topology-specific configuration. See {@link Config}.
      * @param topology         the processing to execute.
      * @param opts             to manipulate the starting of the topology
-     * @param progressListener to track the progress of the jar upload process
+     * @param progressListener to track the progress of the jar upload process {@link ProgressListener}
      * @throws AlreadyAliveException    if a topology with this name is already running
      * @throws InvalidTopologyException if an invalid topology was submitted
      * @throws AuthorizationException   if authorization is failed
-     * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+     * @throws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
      */
     @SuppressWarnings("unchecked")
     public static void submitTopology(String name, Map<String, Object> topoConf, StormTopology topology, SubmitOptions opts,
@@ -215,7 +216,7 @@ public class StormSubmitter {
      *
      * @param asUser The user as which this topology should be submitted.
      * @throws IllegalArgumentException thrown if configs will yield an unschedulable topology. validateConfs validates confs
-     * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+     * @throws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
      */
     public static void submitTopologyAs(String name, Map<String, Object> topoConf, StormTopology topology, SubmitOptions opts,
                                         ProgressListener progressListener, String asUser)
@@ -227,13 +228,19 @@ public class StormSubmitter {
         if (!Utils.isValidConf(topoConf)) {
             throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
         }
-        topoConf = new HashMap(topoConf);
+        topoConf = new HashMap<>(topoConf);
         topoConf.putAll(Utils.readCommandLineOpts());
         Map<String, Object> conf = Utils.readStormConfig();
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology);
+        validateConfs(conf);
+
+        try {
+            Utils.validateCycleFree(topology, name);
+        } catch (InvalidTopologyException ex) {
+            LOG.warn("", ex);
+        }
 
         Map<String, String> passedCreds = new HashMap<>();
         if (opts != null) {
@@ -355,7 +362,7 @@ public class StormSubmitter {
 
     /**
      * Invoke submitter hook.
-     * @thorws SubmitterHookException This is thrown when any Exception occurs during initialization or invocation of registered {@link
+     * @throws SubmitterHookException This is thrown when any Exception occurs during initialization or invocation of registered {@link
      *     ISubmitterHook}
      */
     private static void invokeSubmitterHook(String name, String asUser, Map<String, Object> topoConf, StormTopology topology) {
@@ -407,7 +414,7 @@ public class StormSubmitter {
      * @throws AlreadyAliveException    if a topology with this name is already running
      * @throws InvalidTopologyException if an invalid topology was submitted
      * @throws AuthorizationException   if authorization is failed
-     * @thorws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
+     * @throws SubmitterHookException if any Exception occurs during initialization or invocation of registered {@link ISubmitterHook}
      */
     public static void submitTopologyWithProgressBar(String name, Map<String, Object> topoConf, StormTopology topology,
                                                      SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException,
@@ -444,10 +451,6 @@ public class StormSubmitter {
         }
     }
 
-    private static String submitJar(Map<String, Object> conf, ProgressListener listener) {
-        return submitJar(conf, System.getProperty("storm.jar"), listener);
-    }
-
     /**
      * Submit jar file.
      *
@@ -524,7 +527,7 @@ public class StormSubmitter {
         }
     }
 
-    private static void validateConfs(Map<String, Object> topoConf, StormTopology topology) throws IllegalArgumentException,
+    private static void validateConfs(Map<String, Object> topoConf) throws IllegalArgumentException,
         InvalidTopologyException, AuthorizationException {
         ConfigValidation.validateTopoConf(topoConf);
         Utils.validateTopologyBlobStoreMap(topoConf);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
index 15efada..4d11431 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
  * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.Thrift;
@@ -91,7 +92,7 @@ public class StormCommon {
         }
     }
 
-    private static Set<String> validateIds(Map<String, ? extends Object> componentMap) throws InvalidTopologyException {
+    private static Set<String> validateIds(Map<String, ?> componentMap) throws InvalidTopologyException {
         Set<String> keys = componentMap.keySet();
         for (String id : keys) {
             if (Utils.isSystemId(id)) {
@@ -329,7 +330,7 @@ public class StormCommon {
 
     public static Map<GlobalStreamId, Grouping> eventLoggerInputs(StormTopology topology) {
         Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
-        Set<String> allIds = new HashSet<String>();
+        Set<String> allIds = new HashSet<>();
         allIds.addAll(topology.get_bolts().keySet());
         allIds.addAll(topology.get_spouts().keySet());
 
@@ -374,14 +375,14 @@ public class StormCommon {
 
         List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);
         if (registerInfo != null) {
-            Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();
+            Map<String, Integer> classOccurrencesMap = new HashMap<>();
             for (Map<String, Object> info : registerInfo) {
                 String className = (String) info.get(TOPOLOGY_METRICS_CONSUMER_CLASS);
                 Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT);
                 Integer maxRetainMetricTuples = ObjectReader.getInt(info.get(
                     TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES), 100);
                 Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1);
-                Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();
+                Map<String, Object> metricsConsumerConf = new HashMap<>();
                 metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
                 List<String> whitelist = (List<String>) info.get(
                     TOPOLOGY_METRICS_CONSUMER_WHITELIST);
@@ -398,7 +399,7 @@ public class StormCommon {
                 Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,
                                                                                boltInstance, null, phintNum, metricsConsumerConf);
 
-                String id = className;
+                String id;
                 if (classOccurrencesMap.containsKey(className)) {
                     // e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
                     int occurrenceNum = classOccurrencesMap.get(className);
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index aca356a..ae06708 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -59,12 +59,14 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Set;
+import java.util.Stack;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.jar.JarFile;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
@@ -1907,4 +1909,133 @@ public class Utils {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for bolts in a topology. Note that spouts can be source but not a target in
+     * the edge. The mapping contains ids of spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of outbound edges of BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle/diamond detected
+                List<String> possibleCycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    possibleCycle.add(compId2);
+                } else if (edgesOut.get(compId2) != null && edgesOut.get(compId2).contains(compId1)) {
+                    possibleCycle.addAll(Arrays.asList(compId1, compId2));
+                } else {
+                    List<String> tmp = Collections.list(stack.elements());
+                    int prevIdx = tmp.indexOf(compId2);
+                    if (prevIdx >= 0) {
+                        // cycle (as opposed to diamond)
+                        tmp = tmp.subList(prevIdx, tmp.size());
+                        tmp.add(compId2);
+                        possibleCycle.addAll(tmp);
+                    }
+                }
+                if (!possibleCycle.isEmpty()) {
+                    cycles.add(possibleCycle);
+                    edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+                    continue;
+                }
+            }
+            seen.add(compId2);
+            stack.push(compId2);
+            findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+        }
+        stack.pop();
+    }
+
+    /**
+     * Find and return components cycles in the topology graph when starting from spout.
+     * Return a list of cycles. Each cycle may consist of one or more components.
+     * Components that cannot be reached from any of the spouts are ignored.
+     *
+     * @return a List of cycles. Each cycle has a list of component names.
+     *
+     */
+    @VisibleForTesting
+    public static List<List<String>> findComponentCycles(StormTopology topology, String topoId) {
+        List<List<String>> ret = new ArrayList<>();
+        Map<String, Set<String>> edgesOut = getStormTopologyForwardGraph(topology);
+        Set<String> allComponentIds = new HashSet<>();
+        edgesOut.forEach((k, v) -> {
+            allComponentIds.add(k) ;
+            allComponentIds.addAll(v);
+        });
+
+        if (topology.get_spouts_size() == 0) {
+            LOG.error("Topology {} does not contain any spouts, cannot traverse graph to determine cycles", topoId);
+            return ret;
+        }
+
+        Set<String> unreachable = new HashSet<>(edgesOut.keySet());
+        topology.get_spouts().forEach((spoutId, spout)  -> {
+            Stack<String> dfsStack = new Stack<>();
+            dfsStack.push(spoutId);
+            Set<String> seen = new HashSet<>();
+            seen.add(spoutId);
+            findComponentCyclesRecursion(dfsStack, edgesOut, seen, ret);
+            unreachable.removeAll(seen);
+        });
+
+        // warning about unreachable components
+        if (!unreachable.isEmpty()) {
+            LOG.warn("Topology {} contains unreachable components \"{}\"", topoId, String.join(",", unreachable));
+        }
+        return ret;
+    }
+
+    /**
+     * Validate that the topology is cycle free. If not, then throw an InvalidTopologyException describing the cycle(s).
+     *
+     * @param topology StormTopology instance to examine.
+     * @param name Name of the topology, used in exception error message.
+     * @throws InvalidTopologyException if there are cycles, with message describing the cycles encountered.
+     */
+    public static void validateCycleFree(StormTopology topology, String name) throws InvalidTopologyException {
+        List<List<String>> cycles = Utils.findComponentCycles(topology, name);
+        if (!cycles.isEmpty()) {
+            String err = String.format("Topology %s contains cycles in components \"%s\"", name,
+                    cycles.stream().map(x -> String.join(",", x)).collect(Collectors.joining(" ; ")));
+            throw new WrappedInvalidTopologyException(err);
+        }
+    }
 }
diff --git a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
index d42a7b1..27e8987 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
@@ -18,22 +18,34 @@
 
 package org.apache.storm.utils;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
 import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.shade.com.google.common.collect.ImmutableList;
 import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
 import org.apache.storm.shade.com.google.common.collect.ImmutableSet;
+import org.apache.storm.testing.TestWordCounter;
+import org.apache.storm.testing.TestWordSpout;
 import org.apache.storm.thrift.transport.TTransportException;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.*;
 
 public class UtilsTest {
+    public static final Logger LOG = LoggerFactory.getLogger(UtilsTest.class);
 
     @Test
     public void isZkAuthenticationConfiguredTopologyTest() {
@@ -77,9 +89,7 @@ public class UtilsTest {
     }
 
     private void doParseJvmHeapMemByChildOptsTest(String message, List<String> opts, double expected) {
-        Assert.assertEquals(
-            message,
-            Utils.parseJvmHeapMemByChildOpts(opts, 123.0).doubleValue(), expected, 0);
+        Assert.assertEquals(message, Utils.parseJvmHeapMemByChildOpts(opts, 123.0), expected, 0);
     }
 
     @Test
@@ -237,4 +247,242 @@ public class UtilsTest {
         assertNotNull(found);
         assertEquals(key, found.getVersion());
     }
+
+    @Test
+    public void testFindComponentCycles() {
+        class CycleDetectionScenario {
+            final String testName;
+            final String testDescription;
+            final StormTopology topology;
+            final int expectedCycles;
+
+            CycleDetectionScenario() {
+                testName = "dummy";
+                testDescription = "dummy test";
+                topology = null;
+                expectedCycles = 0;
+            }
+
+            CycleDetectionScenario(String testName, String testDescription, StormTopology topology, int expectedCycles) {
+                this.testName = testName.replace(' ', '-');
+                this.testDescription = testDescription;
+                this.topology = topology;
+                this.expectedCycles = expectedCycles;
+            }
+
+            public List<CycleDetectionScenario> createTestScenarios() {
+                List<CycleDetectionScenario> ret = new ArrayList<>();
+                int testNo = 0;
+                CycleDetectionScenario s;
+                TopologyBuilder tb;
+
+                // Base case
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    s = new CycleDetectionScenario(String.format("(%d) Base", testNo),
+                            "Three level component hierarchy with no loops",
+                            tb.createTopology(),
+                            0);
+                    ret.add(s);
+                }
+
+                // single loop with one bolt
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    // loop bolt 3  (also connect bolt3 to spout 1)
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt3");
+                    ret.add(new CycleDetectionScenario(String.format("(%d) One Loop", testNo),
+                            "Three level component hierarchy with 1 cycle in bolt3",
+                            tb.createTopology(),
+                            1));
+                }
+
+                // single loop with three bolts
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    // loop bolt 3 -> 4 -> 5 -> 3 (also connect bolt3 to spout1)
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt5");
+                    tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("bolt3");
+                    tb.setBolt("bolt5", new TestWordCounter(), 10).shuffleGrouping("bolt4");
+                    ret.add(new CycleDetectionScenario(String.format("(%d) One Loop", testNo),
+                            "Four level component hierarchy with 1 cycle in bolt3,bolt4,bolt5",
+                            tb.createTopology(),
+                            1));
+                }
+
+                // two loops with three bolts, and one bolt
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    // loop bolt 3 -> 4 -> 5 -> 3 (also connect bolt3 to spout1)
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt5");
+                    tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("bolt3");
+                    tb.setBolt("bolt5", new TestWordCounter(), 10).shuffleGrouping("bolt4");
+                    // loop bolt 6  (also connect bolt6 to spout 1)
+                    tb.setBolt("bolt6", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt6");
+                    ret.add(new CycleDetectionScenario(String.format("(%d) Two Loops", testNo),
+                            "Four level component hierarchy with 2 cycles in bolt3,bolt4,bolt5 and bolt6",
+                            tb.createTopology(),
+                            2));
+                }
+
+                // complex cycle
+                {
+                    // (S1 -> B1 -> B2 -> B3 -> B4 <- S2), (B4 -> B3), (B4 -> B1)
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setSpout("spout2", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt4");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("bolt2").shuffleGrouping("bolt4");
+                    tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("bolt3").shuffleGrouping("spout2");
+                    ret.add(new CycleDetectionScenario(String.format("(%d) Complex Loops#1", testNo),
+                            "Complex cycle (S1 -> B1 -> B2 -> B3 -> B4 <- S2), (B4 -> B3), (B4 -> B1)",
+                            tb.createTopology(),
+                            1));
+                }
+
+                // another complex
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setSpout("spout2", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt4").shuffleGrouping("bolt2");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("bolt2").shuffleGrouping("bolt4");
+                    tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("spout2");
+                    ret.add(new CycleDetectionScenario(String.format("(%d) Complex Loops#2", testNo),
+                            "Complex cycle 2 (S1 -> B1 <-> B2 -> B3 ), (S2 -> B4 -> B3), (B4 -> B1)",
+                            tb.createTopology(),
+                            1));
+                }
+
+                // no spouts but with loops; the loops wont be detected
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("bolt4").shuffleGrouping("bolt2");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("bolt2").shuffleGrouping("bolt4");
+                    tb.setBolt("bolt4", new TestWordCounter(), 10);
+                    ret.add(new CycleDetectionScenario(String.format("(%d) No spout complex loops", testNo),
+                            "No Spouts, but with cycles (B1 <-> B2 -> B3 ), (B4 -> B3), (B4 -> B1)",
+                            tb.createTopology(),
+                            0));
+                }
+
+                // now some randomly generated topologies
+                int maxSpouts = 10;
+                int maxBolts = 30;
+                int randomTopoCnt = 100;
+                for (int iRandTest = 0; iRandTest < randomTopoCnt; iRandTest++) {
+                    testNo++;
+                    tb = new TopologyBuilder();
+
+                    // topology component and connection counts
+                    int spoutCnt = ThreadLocalRandom.current().nextInt(0, maxSpouts) + 1;
+                    int boltCnt = ThreadLocalRandom.current().nextInt(0, maxBolts) + 1;
+                    int spoutToBoltConnectionCnt = ThreadLocalRandom.current().nextInt(spoutCnt * boltCnt) + 1;
+                    int boltToBoltConnectionCnt =  ThreadLocalRandom.current().nextInt(boltCnt * boltCnt) + 1;
+
+                    Map<Integer, BoltDeclarer> boltDeclarers = new HashMap<>();
+                    for (int iSpout = 0 ; iSpout  < spoutCnt ; iSpout++) {
+                        tb.setSpout("spout" + iSpout, new TestWordSpout(), 10);
+                    }
+                    for (int iBolt = 0 ; iBolt  < boltCnt ; iBolt++) {
+                        boltDeclarers.put(iBolt, tb.setBolt("bolt" + iBolt, new TestWordCounter(), 10));
+                    }
+                    // spout to bolt connections
+                    for (int i = 0 ; i < spoutToBoltConnectionCnt ; i++) {
+                        int iSpout = ThreadLocalRandom.current().nextInt(0, spoutCnt);
+                        int iBolt = ThreadLocalRandom.current().nextInt(0, boltCnt);
+                        boltDeclarers.get(iBolt).shuffleGrouping("spout" + iSpout);
+                    }
+                    // bolt to bolt connections
+                    for (int i = 0 ; i < boltToBoltConnectionCnt ; i++) {
+                        int iBolt1 = ThreadLocalRandom.current().nextInt(0, boltCnt);
+                        int iBolt2 = ThreadLocalRandom.current().nextInt(0, boltCnt);
+                        boltDeclarers.get(iBolt2).shuffleGrouping("bolt" + iBolt1);
+                    }
+                    ret.add(new CycleDetectionScenario(String.format("(%d) Random Topo#%d", testNo, iRandTest),
+                            String.format("Random topology #%d, spouts=%d, bolts=%d, connections: fromSpouts=%d/fromBolts=%d",
+                                    iRandTest, spoutCnt, boltCnt, spoutToBoltConnectionCnt, boltToBoltConnectionCnt),
+                            tb.createTopology(),
+                            -1));
+                }
+
+                return ret;
+            }
+        }
+        List<String> testFailures = new ArrayList<>();
+
+        new CycleDetectionScenario().createTestScenarios().forEach(x -> {
+            LOG.info("==================== Running Test Scenario: {} =======================", x.testName);
+            LOG.info("{}: {}", x.testName, x.testDescription);
+
+            List<List<String>> loops = Utils.findComponentCycles(x.topology, x.testName);
+            if (x.expectedCycles >= 0) {
+                if (!loops.isEmpty()) {
+                    LOG.info("{} detected loops are \"{}\"", x.testName,
+                            loops.stream()
+                                    .map(y -> String.join(",", y))
+                                    .collect(Collectors.joining(" ; "))
+                    );
+                }
+                if (loops.size() != x.expectedCycles) {
+                    testFailures.add(
+                            String.format("Test \"%s\" failed, detected cycles=%d does not match expected=%d for \"%s\"",
+                                    x.testName, loops.size(), x.expectedCycles, x.testDescription));
+                    if (!loops.isEmpty()) {
+                        testFailures.add(
+                                String.format("\t\tdetected loops are \"%s\"",
+                                        loops.stream()
+                                                .map(y -> String.join(",", y))
+                                                .collect(Collectors.joining(" ; "))
+                                )
+                        );
+                    }
+                }
+            } else {
+                // these are random topologies, with indeterminate number of loops
+                LOG.info("{} detected loop count is \"{}\"", x.testName, loops.size());
+            }
+        });
+        if (!testFailures.isEmpty()) {
+            fail(String.join("\n", testFailures));
+        }
+    }
 }