You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2020/08/07 17:24:38 UTC

[GitHub] [storm] bipinprasad opened a new pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

bipinprasad opened a new pull request #3322:
URL: https://github.com/apache/storm/pull/3322


   ## What is the purpose of the change
   
   *Topology is expected to be a Directed Acyclic Graph. Cycles in component flow can cause unexpected behavior, for example a deadlock when one of the loop components signals a back-pressure. Or properly detecting proximity when scheduling.*
   
   ## How was the change tested
   
   *New test to create topology with and without cycles and test detection.*


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r470061568



##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for spouts and bolts in a topology. The mapping contains ids of the spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of out bound edges of SpoutIds/BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_spouts() != null) {
+            topology.get_spouts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle detected
+                List<String> cycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    cycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {
+                    cycle.addAll(Arrays.asList(compId1, compId2));
+                } else {
+                    List<String> tmp = Collections.list(stack.elements());
+                    int prevIdx = tmp.indexOf(compId2);
+                    if (prevIdx >= 0) {
+                        tmp.subList(prevIdx, tmp.size());

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482346963



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -233,7 +235,7 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology);
+        validateConfs(conf, topology, name);

Review comment:
       @bipinprasad   
   Sorry for not being clear at the first comment. What do you think about moving it to Utils?^ Trying to not expose too many public methods.  Also StormCommon is `org.apache.storm.daemon.StormCommon`. StormSubmitter is completely on client side. I feel like Utils is a better place for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482258004



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -235,7 +237,14 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology, name);
+        validateConfs(conf);
+
+        try {
+            StormCommon.validateCycleFree(topology, name);
+        } catch (InvalidTopologyException ex) {
+            LOG.warn(ex.get_msg(), ex);
+            System.out.println(ex.get_msg());

Review comment:
       ok. removed sysout.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad closed pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad closed pull request #3322:
URL: https://github.com/apache/storm/pull/3322


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on pull request #3322:
URL: https://github.com/apache/storm/pull/3322#issuecomment-671392318


   Close/Reopen for the rebuild - failures due to license file failures for JDK 11?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad closed pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad closed pull request #3322:
URL: https://github.com/apache/storm/pull/3322


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r480452587



##########
File path: storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
##########
@@ -237,4 +247,167 @@ public void checkVersionInfo() {
         assertNotNull(found);
         assertEquals(key, found.getVersion());
     }
+
+    @Test
+    public void testFindComponentCycles() {
+        class CycleDetectionScenario {
+            final String testName;
+            final String testDescription;
+            final StormTopology topology;
+            final int expectedCycles;
+
+            CycleDetectionScenario() {
+                testName = "dummy";
+                testDescription = "dummy test";
+                topology = null;
+                expectedCycles = 0;
+            }
+
+            CycleDetectionScenario(String testName, String testDescription, StormTopology topology, int expectedCycles) {
+                this.testName = testName.replace(' ', '-');
+                this.testDescription = testDescription;
+                this.topology = topology;
+                this.expectedCycles = expectedCycles;
+            }
+
+            public List<CycleDetectionScenario> createTestScenarios() {
+                List<CycleDetectionScenario> ret = new ArrayList<>();
+                int testNo = 0;
+                CycleDetectionScenario s;
+                TopologyBuilder tb;
+
+                // Base case
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    s = new CycleDetectionScenario(String.format("(%d) Base", testNo),
+                            "Three level component hierarchy with no loops",
+                            tb.createTopology(),
+                            0);
+                    ret.add(s);
+                }
+
+                // single loop with one bolt
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    // loop bolt 3  (also connect bolt3 to spout 1)
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt3");
+                    ret.add(new CycleDetectionScenario(String.format("(%d) One Loop", testNo),
+                            "Four level component hierarchy with 1 cycle in bolt3",
+                            tb.createTopology(),
+                            1));
+                }
+
+                // single loop with three bolts
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    // loop bolt 3 -> 4 -> 5 -> 3 (also connect bolt3 to spout1)
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt5");
+                    tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("bolt3");
+                    tb.setBolt("bolt5", new TestWordCounter(), 10).shuffleGrouping("bolt4");
+                    ret.add(new CycleDetectionScenario(String.format("(%d) One Loop", testNo),
+                            "Four level component hierarchy with 1 cycle in bolt3,bolt4,bolt5",
+                            tb.createTopology(),
+                            1));
+                }
+
+                // two loops with three bolts, and one bolt
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    // loop bolt 3 -> 4 -> 5 -> 3 (also connect bolt3 to spout1)
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt5");
+                    tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("bolt3");
+                    tb.setBolt("bolt5", new TestWordCounter(), 10).shuffleGrouping("bolt4");
+                    // loop bolt 6  (also connect bolt6 to spout 1)
+                    tb.setBolt("bolt6", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt6");
+                    ret.add(new CycleDetectionScenario(String.format("(%d) Two Loops", testNo),
+                            "Four level component hierarchy with 2 cycles in bolt3,bolt4,bolt5 and bolt6",
+                            tb.createTopology(),
+                            2));
+                }
+
+                // complex cycle
+                {
+                    // (S1 -> B1 -> B2 -> B3 -> B4 <- S2), (B4 -> B3), (B4 -> B2)

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482346087



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -235,7 +237,14 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology, name);
+        validateConfs(conf);
+
+        try {
+            StormCommon.validateCycleFree(topology, name);
+        } catch (InvalidTopologyException ex) {
+            LOG.warn(ex.get_msg(), ex);

Review comment:
       Right. So the log will look like 
   ```
   19:22:23.981 [main] WARN  o.a.s.StormSubmitter - Topology wc contains cycles in components "count,split"
   org.apache.storm.generated.InvalidTopologyException: null
   	at org.apache.storm.daemon.StormCommon.validateCycleFree(StormCommon.java:596) ~[storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
   	at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:244) [storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
   	at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:214) [storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
   	at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:177) [storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
   	at org.apache.storm.topology.ConfigurableTopology.submit(ConfigurableTopology.java:119) [storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
   	at org.apache.storm.starter.WordCountTopology.run(WordCountTopology.java:58) [storm-starter-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
   	at org.apache.storm.topology.ConfigurableTopology.start(ConfigurableTopology.java:68) [storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
   	at org.apache.storm.starter.WordCountTopology.main(WordCountTopology.java:36) [storm-starter-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
   ```
   
   
   We can probably use [WrappedInvalidTopologyException](https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/utils/WrappedInvalidTopologyException.java) inside `validateCycleFree` method so the stacktrace will make more sense. What do you think? 
   It will be like 
   ```
   org.apache.storm.utils.WrappedInvalidTopologyException: Topology wc2 contains cycles in components "count,split"
   	at org.apache.storm.daemon.StormCommon.validateCycleFree(StormCommon.java:596) ~[storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
   	.....
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r480402134



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -233,7 +235,7 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology);
+        validateConfs(conf, topology, name);

Review comment:
       I feel like this is not the right place to detect cycles.  We should probably put it in another function, like 
   `StormCommon.validateBasic`
   
   https://git.vzbuilders.com/storm/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java#L156
   
   or create a new method.

##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,117 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for bolts in a topology. Note that spouts can be source but not a target in
+     * the edge. The mapping contains ids of spouts and bolts.

Review comment:
       Comments  here and below need to be updated since the mapping only contains bolts.

##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,117 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for bolts in a topology. Note that spouts can be source but not a target in
+     * the edge. The mapping contains ids of spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of outbound edges of BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle/diamond detected
+                List<String> possibleCycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    possibleCycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {
+                    possibleCycle.addAll(Arrays.asList(compId1, compId2));
+                } else {
+                    List<String> tmp = Collections.list(stack.elements());
+                    int prevIdx = tmp.indexOf(compId2);
+                    if (prevIdx >= 0) {
+                        // cycle (as opposed to diamond)
+                        tmp = tmp.subList(prevIdx, tmp.size());
+                        tmp.add(compId2);
+                        possibleCycle.addAll(tmp);
+                    }
+                }
+                if (!possibleCycle.isEmpty()) {
+                    cycles.add(possibleCycle);
+                    edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+                    continue;
+                }
+            }
+            seen.add(compId2);
+            stack.push(compId2);
+            findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+        }
+        stack.pop();
+    }
+
+    /**
+     * Find and return components cycles in the topology graph when starting from spout.
+     * Return a list of cycles. Each cycle may consist of one or more components.
+     * Components that cannot be reached from any of the spouts are ignored.
+     *
+     * @return a List of cycles. Each cycle has a list of component names.
+     *
+     */
+    public static List<List<String>> findComponentCycles(StormTopology topology, String topoId) {
+        List<List<String>> ret = new ArrayList<>();
+        Map<String, Set<String>> edgesOut = getStormTopologyForwardGraph(topology);
+        Set<String> allComponentIds = new HashSet<>();
+        edgesOut.forEach((k, v) -> {
+            allComponentIds.add(k) ;
+            allComponentIds.addAll(v);
+        });
+
+        if (topology.get_spouts_size() == 0) {
+            LOG.error("Topology {} does not contain any spouts, cannot traverse graph to determine cycles", topoId);
+            ret.add(new ArrayList(edgesOut.keySet()));

Review comment:
       This doesn't mean it has cycles. We should separate them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r470122901



##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for spouts and bolts in a topology. The mapping contains ids of the spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of out bound edges of SpoutIds/BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_spouts() != null) {
+            topology.get_spouts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle detected
+                List<String> cycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    cycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {
+                    cycle.addAll(Arrays.asList(compId1, compId2));
+                } else {
+                    List<String> tmp = Collections.list(stack.elements());
+                    int prevIdx = tmp.indexOf(compId2);
+                    if (prevIdx >= 0) {
+                        tmp.subList(prevIdx, tmp.size());
+                    }
+                    tmp.add(compId2);
+                    cycle.addAll(tmp);
+                }
+                cycles.add(cycle);
+                edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+                continue;
+            }
+            seen.add(compId2);
+            stack.push(compId2);
+            findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+        }
+        stack.pop();
+    }
+
+    /**
+     * Find and return components cycles in the topology graph when starting from spout.
+     * Return a list of cycles. Each cycle may consist of one or more components.
+     * Components that cannot be reached from any of the spouts are ignored.
+     *
+     * @return a List of cycles. Each cycle has a list of component names.
+     *
+     */
+    public static List<List<String>> findComponentCycles(StormTopology topology, String topoId) {
+        List<List<String>> ret = new ArrayList<>();
+        Map<String, Set<String>> edgesOut = getStormTopologyForwardGraph(topology);
+        Set<String> allComponentIds = new HashSet<>();
+        edgesOut.forEach((k, v) -> {
+            allComponentIds.add(k) ;
+            allComponentIds.addAll(v);
+        });
+
+        if (topology.get_spouts_size() == 0) {
+            LOG.error("Topology {} does not contain any spouts, cannot traverse graph to determine cycles", topoId);
+            ret.add(new ArrayList(edgesOut.keySet()));
+            return ret;
+        }
+
+        Set<String> unreachable = new HashSet<>(edgesOut.keySet());
+        topology.get_spouts().forEach((spoutId, spout)  -> {
+            Stack<String> dfsStack = new Stack<>();
+            dfsStack.push(spoutId);
+            Set<String> seen = new HashSet<>();
+            seen.add(spoutId);
+            findComponentCyclesRecursion(dfsStack, edgesOut, seen, ret);
+            unreachable.removeAll(seen);
+        });
+
+        // warning about unreachable components
+        if (!unreachable.isEmpty()) {
+            LOG.warn("Topology {} contains unreachable components \"{}\"", topoId, String.join(",", unreachable));
+        }
+
+        // detected cycles
+        if (!ret.isEmpty()) {
+            LOG.error("Topology {} contains cycles {}", topoId,

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r480446637



##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,117 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for bolts in a topology. Note that spouts can be source but not a target in
+     * the edge. The mapping contains ids of spouts and bolts.

Review comment:
       Key can be bolt or spout. Value contains bolt ids only.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r470074373



##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for spouts and bolts in a topology. The mapping contains ids of the spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of out bound edges of SpoutIds/BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_spouts() != null) {
+            topology.get_spouts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle detected
+                List<String> cycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    cycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {
+                    cycle.addAll(Arrays.asList(compId1, compId2));
+                } else {
+                    List<String> tmp = Collections.list(stack.elements());
+                    int prevIdx = tmp.indexOf(compId2);
+                    if (prevIdx >= 0) {
+                        tmp.subList(prevIdx, tmp.size());
+                    }
+                    tmp.add(compId2);
+                    cycle.addAll(tmp);
+                }
+                cycles.add(cycle);
+                edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+                continue;
+            }
+            seen.add(compId2);
+            stack.push(compId2);
+            findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+        }
+        stack.pop();
+    }
+
+    /**
+     * Find and return components cycles in the topology graph when starting from spout.
+     * Return a list of cycles. Each cycle may consist of one or more components.
+     * Components that cannot be reached from any of the spouts are ignored.
+     *
+     * @return a List of cycles. Each cycle has a list of component names.
+     *
+     */
+    public static List<List<String>> findComponentCycles(StormTopology topology, String topoId) {

Review comment:
       This specific result will be fixed by the tmp.sublist. 
   However, I am wondering if this will misclassify diamonds as a cycle (undesired), in which case, the code should be changed to require that #1974 be true for cycles.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482121654



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -233,7 +235,7 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology);
+        validateConfs(conf, topology, name);

Review comment:
       Moving it to Utils is also okay, like 
   `Utils.validateTopologyBlobStoreMap`
   `Utils.validateTopologyName`
   etc.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r480425651



##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,117 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for bolts in a topology. Note that spouts can be source but not a target in
+     * the edge. The mapping contains ids of spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of outbound edges of BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle/diamond detected
+                List<String> possibleCycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    possibleCycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {

Review comment:
       Sounds good. I would add this test case too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on pull request #3322:
URL: https://github.com/apache/storm/pull/3322#issuecomment-672260764


   close/reopen to rebuild.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r470047775



##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for spouts and bolts in a topology. The mapping contains ids of the spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of out bound edges of SpoutIds/BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_spouts() != null) {
+            topology.get_spouts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {

Review comment:
       I wondered about the code in Topology.getComponents() - but seems superfluous. I will remove the get_inputs on spout and check for isSystemId check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482346963



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -233,7 +235,7 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology);
+        validateConfs(conf, topology, name);

Review comment:
       @bipinprasad   
   Sorry for not being clear at the first comment. What do you think about moving it to Utils?^




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482116135



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -235,7 +237,14 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology, name);
+        validateConfs(conf);
+
+        try {
+            StormCommon.validateCycleFree(topology, name);
+        } catch (InvalidTopologyException ex) {
+            LOG.warn(ex.get_msg(), ex);
+            System.out.println(ex.get_msg());

Review comment:
       We should remove this

##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -51,6 +52,7 @@
 import org.apache.storm.validation.ConfigValidation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import sun.net.ProgressListener;

Review comment:
       Is this used anywhere?

##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -235,7 +237,14 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology, name);
+        validateConfs(conf);
+
+        try {
+            StormCommon.validateCycleFree(topology, name);
+        } catch (InvalidTopologyException ex) {
+            LOG.warn(ex.get_msg(), ex);

Review comment:
       Will `ex` already contains `ex.get_msg()`? If so, we will see repeating messages, which should be avoid.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r470127212



##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for spouts and bolts in a topology. The mapping contains ids of the spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of out bound edges of SpoutIds/BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_spouts() != null) {
+            topology.get_spouts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle detected
+                List<String> cycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    cycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {
+                    cycle.addAll(Arrays.asList(compId1, compId2));
+                } else {
+                    List<String> tmp = Collections.list(stack.elements());
+                    int prevIdx = tmp.indexOf(compId2);
+                    if (prevIdx >= 0) {
+                        tmp.subList(prevIdx, tmp.size());
+                    }
+                    tmp.add(compId2);
+                    cycle.addAll(tmp);
+                }
+                cycles.add(cycle);
+                edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+                continue;
+            }
+            seen.add(compId2);
+            stack.push(compId2);
+            findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+        }
+        stack.pop();
+    }
+
+    /**
+     * Find and return components cycles in the topology graph when starting from spout.
+     * Return a list of cycles. Each cycle may consist of one or more components.
+     * Components that cannot be reached from any of the spouts are ignored.
+     *
+     * @return a List of cycles. Each cycle has a list of component names.
+     *
+     */
+    public static List<List<String>> findComponentCycles(StormTopology topology, String topoId) {

Review comment:
       Added the complex cycle(s) test case. Exclude diamond connections from loops.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482256332



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -235,7 +237,14 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology, name);
+        validateConfs(conf);
+
+        try {
+            StormCommon.validateCycleFree(topology, name);
+        } catch (InvalidTopologyException ex) {
+            LOG.warn(ex.get_msg(), ex);

Review comment:
       Do you have an output example that I can take a look?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r480464504



##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,117 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for bolts in a topology. Note that spouts can be source but not a target in
+     * the edge. The mapping contains ids of spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of outbound edges of BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle/diamond detected
+                List<String> possibleCycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    possibleCycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {
+                    possibleCycle.addAll(Arrays.asList(compId1, compId2));
+                } else {
+                    List<String> tmp = Collections.list(stack.elements());
+                    int prevIdx = tmp.indexOf(compId2);
+                    if (prevIdx >= 0) {
+                        // cycle (as opposed to diamond)
+                        tmp = tmp.subList(prevIdx, tmp.size());
+                        tmp.add(compId2);
+                        possibleCycle.addAll(tmp);
+                    }
+                }
+                if (!possibleCycle.isEmpty()) {
+                    cycles.add(possibleCycle);
+                    edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+                    continue;
+                }
+            }
+            seen.add(compId2);
+            stack.push(compId2);
+            findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+        }
+        stack.pop();
+    }
+
+    /**
+     * Find and return components cycles in the topology graph when starting from spout.
+     * Return a list of cycles. Each cycle may consist of one or more components.
+     * Components that cannot be reached from any of the spouts are ignored.
+     *
+     * @return a List of cycles. Each cycle has a list of component names.
+     *
+     */
+    public static List<List<String>> findComponentCycles(StormTopology topology, String topoId) {
+        List<List<String>> ret = new ArrayList<>();
+        Map<String, Set<String>> edgesOut = getStormTopologyForwardGraph(topology);
+        Set<String> allComponentIds = new HashSet<>();
+        edgesOut.forEach((k, v) -> {
+            allComponentIds.add(k) ;
+            allComponentIds.addAll(v);
+        });
+
+        if (topology.get_spouts_size() == 0) {
+            LOG.error("Topology {} does not contain any spouts, cannot traverse graph to determine cycles", topoId);
+            ret.add(new ArrayList(edgesOut.keySet()));

Review comment:
       Changed to return no cycle.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482230208



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -235,7 +237,14 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology, name);
+        validateConfs(conf);
+
+        try {
+            StormCommon.validateCycleFree(topology, name);
+        } catch (InvalidTopologyException ex) {
+            LOG.warn(ex.get_msg(), ex);
+            System.out.println(ex.get_msg());

Review comment:
       Did we not want to warn the user when submitting?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r481446072



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -233,7 +235,7 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology);
+        validateConfs(conf, topology, name);

Review comment:
       Created new method StormCommon.validateCycleFree




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r469539586



##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for spouts and bolts in a topology. The mapping contains ids of the spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of out bound edges of SpoutIds/BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_spouts() != null) {
+            topology.get_spouts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {

Review comment:
       The `topology` here must be a userTopology (if it is a system topology, it will have loops because of ackers).
   So I think we don't need to check `Utils.isSystemId`. And we don't need to `get_inputs` on spout since it will be empty. (see https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java#L162-L166)

##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for spouts and bolts in a topology. The mapping contains ids of the spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of out bound edges of SpoutIds/BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_spouts() != null) {
+            topology.get_spouts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle detected
+                List<String> cycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    cycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {
+                    cycle.addAll(Arrays.asList(compId1, compId2));
+                } else {
+                    List<String> tmp = Collections.list(stack.elements());
+                    int prevIdx = tmp.indexOf(compId2);
+                    if (prevIdx >= 0) {
+                        tmp.subList(prevIdx, tmp.size());

Review comment:
       Looks like this needs to be `tmp = tmp.subList(prevIdx, tmp.size());` otherwise the result will not be accurate

##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for spouts and bolts in a topology. The mapping contains ids of the spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of out bound edges of SpoutIds/BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_spouts() != null) {
+            topology.get_spouts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle detected
+                List<String> cycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    cycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {
+                    cycle.addAll(Arrays.asList(compId1, compId2));
+                } else {
+                    List<String> tmp = Collections.list(stack.elements());
+                    int prevIdx = tmp.indexOf(compId2);
+                    if (prevIdx >= 0) {
+                        tmp.subList(prevIdx, tmp.size());
+                    }
+                    tmp.add(compId2);
+                    cycle.addAll(tmp);
+                }
+                cycles.add(cycle);
+                edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+                continue;
+            }
+            seen.add(compId2);
+            stack.push(compId2);
+            findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+        }
+        stack.pop();
+    }
+
+    /**
+     * Find and return components cycles in the topology graph when starting from spout.
+     * Return a list of cycles. Each cycle may consist of one or more components.
+     * Components that cannot be reached from any of the spouts are ignored.
+     *
+     * @return a List of cycles. Each cycle has a list of component names.
+     *
+     */
+    public static List<List<String>> findComponentCycles(StormTopology topology, String topoId) {
+        List<List<String>> ret = new ArrayList<>();
+        Map<String, Set<String>> edgesOut = getStormTopologyForwardGraph(topology);
+        Set<String> allComponentIds = new HashSet<>();
+        edgesOut.forEach((k, v) -> {
+            allComponentIds.add(k) ;
+            allComponentIds.addAll(v);
+        });
+
+        if (topology.get_spouts_size() == 0) {
+            LOG.error("Topology {} does not contain any spouts, cannot traverse graph to determine cycles", topoId);
+            ret.add(new ArrayList(edgesOut.keySet()));
+            return ret;
+        }
+
+        Set<String> unreachable = new HashSet<>(edgesOut.keySet());
+        topology.get_spouts().forEach((spoutId, spout)  -> {
+            Stack<String> dfsStack = new Stack<>();
+            dfsStack.push(spoutId);
+            Set<String> seen = new HashSet<>();
+            seen.add(spoutId);
+            findComponentCyclesRecursion(dfsStack, edgesOut, seen, ret);
+            unreachable.removeAll(seen);
+        });
+
+        // warning about unreachable components
+        if (!unreachable.isEmpty()) {
+            LOG.warn("Topology {} contains unreachable components \"{}\"", topoId, String.join(",", unreachable));
+        }
+
+        // detected cycles
+        if (!ret.isEmpty()) {
+            LOG.error("Topology {} contains cycles {}", topoId,

Review comment:
       This seems repeating the log message in validateConfs, and can be probably removed.

##########
File path: storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
##########
@@ -237,4 +254,184 @@ public void checkVersionInfo() {
         assertNotNull(found);
         assertEquals(key, found.getVersion());
     }
+
+    @Test
+    public void testFindComponentCycles() {
+        class CycleDetectionScenario {
+            final String testName;
+            final String testDescription;
+            final StormTopology topology;
+            final int expectedCycles;
+
+            CycleDetectionScenario() {
+                testName = "dummy";
+                testDescription = "dummy test";
+                topology = null;
+                expectedCycles = 0;
+            }
+
+            CycleDetectionScenario(String testName, String testDescription, StormTopology topology, int expectedCycles) {
+                this.testName = testName;
+                this.testDescription = testDescription;
+                this.topology = topology;
+                this.expectedCycles = expectedCycles;
+            }
+
+            private IRichSpout makeDummySpout() {
+                return new BaseRichSpout() {
+                    @Override
+                    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+                    }
+
+                    @Override
+                    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+                    }
+
+                    @Override
+                    public void nextTuple() {
+                    }
+
+                    private void writeObject(java.io.ObjectOutputStream stream) {
+                    }
+                };
+            }
+
+            private IStatefulBolt makeDummyStatefulBolt() {

Review comment:
       can we use non-stateful bolt? We don't really use it anywhere. And it will insert some bolts into the topology. So the unit test result might be hard to understand. For example, like https://github.com/apache/storm/blob/master/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java#L200-L201

##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for spouts and bolts in a topology. The mapping contains ids of the spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of out bound edges of SpoutIds/BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_spouts() != null) {
+            topology.get_spouts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle detected
+                List<String> cycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    cycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {
+                    cycle.addAll(Arrays.asList(compId1, compId2));
+                } else {
+                    List<String> tmp = Collections.list(stack.elements());
+                    int prevIdx = tmp.indexOf(compId2);
+                    if (prevIdx >= 0) {
+                        tmp.subList(prevIdx, tmp.size());
+                    }
+                    tmp.add(compId2);
+                    cycle.addAll(tmp);
+                }
+                cycles.add(cycle);
+                edgesOut.get(compId1).remove(compId2); // disconnect this cycle
+                continue;
+            }
+            seen.add(compId2);
+            stack.push(compId2);
+            findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
+        }
+        stack.pop();
+    }
+
+    /**
+     * Find and return components cycles in the topology graph when starting from spout.
+     * Return a list of cycles. Each cycle may consist of one or more components.
+     * Components that cannot be reached from any of the spouts are ignored.
+     *
+     * @return a List of cycles. Each cycle has a list of component names.
+     *
+     */
+    public static List<List<String>> findComponentCycles(StormTopology topology, String topoId) {

Review comment:
        I haven't read the complete implementation of `findComponentCycles`. But there seems to be an issue on detecting complex cycles.
   
   ```
    tb = new TopologyBuilder();
                       tb.setSpout("spout1", new TestWordSpout(), 10);
                       tb.setSpout("spout2", new TestWordSpout(), 10);
                       tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt4");
                       tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("bolt1");
                       tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("bolt2").shuffleGrouping("bolt4");
                       tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("bolt3").shuffleGrouping("spout2");
   ```
   The result ( is 
   ```
    contains cycles bolt3,bolt4 ; spout2,bolt4,bolt3
   ```
   
   `spout2` shouldn't be in the result. Please let me know if I am doing anything wrong.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482121654



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -233,7 +235,7 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology);
+        validateConfs(conf, topology, name);

Review comment:
       Moving it to Utils is also okay, like 
   `Utils.validateTopologyBlobStoreMap`
   `Utils.validateTopologyName`
   etc.
   
   Probably makes more sense since `findComponentCycles` is implemented inside Utils.
   
   And we could make `Utils.findComponentCycles` "@VisibleForTesting". So the public method will be only `Utils. validateCycleFree` and the internal implementation can be changed if we'd like in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on pull request #3322:
URL: https://github.com/apache/storm/pull/3322#issuecomment-673680837


   close/reopen to rebuild.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482225335



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -51,6 +52,7 @@
 import org.apache.storm.validation.ConfigValidation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import sun.net.ProgressListener;

Review comment:
       No. Will remove this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482255674



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -235,7 +237,14 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology, name);
+        validateConfs(conf);
+
+        try {
+            StormCommon.validateCycleFree(topology, name);
+        } catch (InvalidTopologyException ex) {
+            LOG.warn(ex.get_msg(), ex);
+            System.out.println(ex.get_msg());

Review comment:
       `LOG.warn` should be sufficient. We don't need System.out.println()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r480451764



##########
File path: storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
##########
@@ -237,4 +247,167 @@ public void checkVersionInfo() {
         assertNotNull(found);
         assertEquals(key, found.getVersion());
     }
+
+    @Test
+    public void testFindComponentCycles() {
+        class CycleDetectionScenario {
+            final String testName;
+            final String testDescription;
+            final StormTopology topology;
+            final int expectedCycles;
+
+            CycleDetectionScenario() {
+                testName = "dummy";
+                testDescription = "dummy test";
+                topology = null;
+                expectedCycles = 0;
+            }
+
+            CycleDetectionScenario(String testName, String testDescription, StormTopology topology, int expectedCycles) {
+                this.testName = testName.replace(' ', '-');
+                this.testDescription = testDescription;
+                this.topology = topology;
+                this.expectedCycles = expectedCycles;
+            }
+
+            public List<CycleDetectionScenario> createTestScenarios() {
+                List<CycleDetectionScenario> ret = new ArrayList<>();
+                int testNo = 0;
+                CycleDetectionScenario s;
+                TopologyBuilder tb;
+
+                // Base case
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    s = new CycleDetectionScenario(String.format("(%d) Base", testNo),
+                            "Three level component hierarchy with no loops",
+                            tb.createTopology(),
+                            0);
+                    ret.add(s);
+                }
+
+                // single loop with one bolt
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    // loop bolt 3  (also connect bolt3 to spout 1)
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt3");
+                    ret.add(new CycleDetectionScenario(String.format("(%d) One Loop", testNo),
+                            "Four level component hierarchy with 1 cycle in bolt3",

Review comment:
       Yes. Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r483199882



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -233,7 +235,7 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology);
+        validateConfs(conf, topology, name);

Review comment:
       Moved from StormCommons to Utils.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r483112937



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -233,7 +235,7 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology);
+        validateConfs(conf, topology, name);

Review comment:
       Ok. Will switch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad closed pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad closed pull request #3322:
URL: https://github.com/apache/storm/pull/3322


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482261702



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -235,7 +237,14 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology, name);
+        validateConfs(conf);
+
+        try {
+            StormCommon.validateCycleFree(topology, name);
+        } catch (InvalidTopologyException ex) {
+            LOG.warn(ex.get_msg(), ex);

Review comment:
       InvalidTopologyException constructor does not call Exception.super(msg): https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/generated/InvalidTopologyException.java#L111
   
   hence, its msg variable is not available in the Exception.getMessage()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r470047775



##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for spouts and bolts in a topology. The mapping contains ids of the spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of out bound edges of SpoutIds/BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_spouts() != null) {
+            topology.get_spouts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {

Review comment:
       I wondered about the code in TopologyDetails.getComponents() - but seems superfluous. I will remove the get_inputs on spout and check for isSystemId check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r483199537



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -235,7 +237,14 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology, name);
+        validateConfs(conf);
+
+        try {
+            StormCommon.validateCycleFree(topology, name);
+        } catch (InvalidTopologyException ex) {
+            LOG.warn(ex.get_msg(), ex);

Review comment:
       That is better than a scary "null" in the stack trace.
   Changed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482255674



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -235,7 +237,14 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology, name);
+        validateConfs(conf);
+
+        try {
+            StormCommon.validateCycleFree(topology, name);
+        } catch (InvalidTopologyException ex) {
+            LOG.warn(ex.get_msg(), ex);
+            System.out.println(ex.get_msg());

Review comment:
       `LOG.warn` should be sufficient. We don't use System.out.println()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm merged pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm merged pull request #3322:
URL: https://github.com/apache/storm/pull/3322


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r470060170



##########
File path: storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
##########
@@ -237,4 +254,184 @@ public void checkVersionInfo() {
         assertNotNull(found);
         assertEquals(key, found.getVersion());
     }
+
+    @Test
+    public void testFindComponentCycles() {
+        class CycleDetectionScenario {
+            final String testName;
+            final String testDescription;
+            final StormTopology topology;
+            final int expectedCycles;
+
+            CycleDetectionScenario() {
+                testName = "dummy";
+                testDescription = "dummy test";
+                topology = null;
+                expectedCycles = 0;
+            }
+
+            CycleDetectionScenario(String testName, String testDescription, StormTopology topology, int expectedCycles) {
+                this.testName = testName;
+                this.testDescription = testDescription;
+                this.topology = topology;
+                this.expectedCycles = expectedCycles;
+            }
+
+            private IRichSpout makeDummySpout() {
+                return new BaseRichSpout() {
+                    @Override
+                    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+                    }
+
+                    @Override
+                    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
+                    }
+
+                    @Override
+                    public void nextTuple() {
+                    }
+
+                    private void writeObject(java.io.ObjectOutputStream stream) {
+                    }
+                };
+            }
+
+            private IStatefulBolt makeDummyStatefulBolt() {

Review comment:
       Replaced with TestWordSpout and TestWordCounter




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r480408097



##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,117 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for bolts in a topology. Note that spouts can be source but not a target in
+     * the edge. The mapping contains ids of spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of outbound edges of BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle/diamond detected
+                List<String> possibleCycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    possibleCycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {

Review comment:
       Fixed the NPE. But now I will add to randomly generate some topologies. To make sure that that this code is well exercised.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] bipinprasad commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
bipinprasad commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r482231921



##########
File path: storm-client/src/jvm/org/apache/storm/StormSubmitter.java
##########
@@ -235,7 +237,14 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
         conf.putAll(topoConf);
         topoConf.putAll(prepareZookeeperAuthentication(conf));
 
-        validateConfs(conf, topology, name);
+        validateConfs(conf);
+
+        try {
+            StormCommon.validateCycleFree(topology, name);
+        } catch (InvalidTopologyException ex) {
+            LOG.warn(ex.get_msg(), ex);

Review comment:
       The constructor seems to imply that the message will not be in the Exception, since the proper super method is not called.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r480389623



##########
File path: storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
##########
@@ -237,4 +247,167 @@ public void checkVersionInfo() {
         assertNotNull(found);
         assertEquals(key, found.getVersion());
     }
+
+    @Test
+    public void testFindComponentCycles() {
+        class CycleDetectionScenario {
+            final String testName;
+            final String testDescription;
+            final StormTopology topology;
+            final int expectedCycles;
+
+            CycleDetectionScenario() {
+                testName = "dummy";
+                testDescription = "dummy test";
+                topology = null;
+                expectedCycles = 0;
+            }
+
+            CycleDetectionScenario(String testName, String testDescription, StormTopology topology, int expectedCycles) {
+                this.testName = testName.replace(' ', '-');
+                this.testDescription = testDescription;
+                this.topology = topology;
+                this.expectedCycles = expectedCycles;
+            }
+
+            public List<CycleDetectionScenario> createTestScenarios() {
+                List<CycleDetectionScenario> ret = new ArrayList<>();
+                int testNo = 0;
+                CycleDetectionScenario s;
+                TopologyBuilder tb;
+
+                // Base case
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    s = new CycleDetectionScenario(String.format("(%d) Base", testNo),
+                            "Three level component hierarchy with no loops",
+                            tb.createTopology(),
+                            0);
+                    ret.add(s);
+                }
+
+                // single loop with one bolt
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    // loop bolt 3  (also connect bolt3 to spout 1)
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt3");
+                    ret.add(new CycleDetectionScenario(String.format("(%d) One Loop", testNo),
+                            "Four level component hierarchy with 1 cycle in bolt3",

Review comment:
       Should this be "Three Level" component hierarchy?

##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,117 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for bolts in a topology. Note that spouts can be source but not a target in
+     * the edge. The mapping contains ids of spouts and bolts.
+     *
+     * @param topology StormTopology to examine.
+     * @return a map with entry for each SpoutId/BoltId to a set of outbound edges of BoltIds.
+     */
+    private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
+        Map<String, Set<String>> edgesOut = new HashMap<>();
+
+        if (topology.get_bolts() != null) {
+            topology.get_bolts().entrySet().forEach(entry -> {
+                if (!Utils.isSystemId(entry.getKey())) {
+                    entry.getValue().get_common().get_inputs().forEach((k, v) -> {
+                        edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
+                    });
+                }
+            });
+        }
+        return edgesOut;
+    }
+
+    /**
+     * Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
+     * In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
+     *
+     * @param stack used for recursion.
+     * @param edgesOut outbound edge connections, modified when cycle is detected.
+     * @param seen keeps track of component ids that have already been seen.
+     * @param cycles list of cycles seen so far.
+     */
+    private static void findComponentCyclesRecursion(
+            Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
+        if (stack.isEmpty()) {
+            return;
+        }
+        String compId1 = stack.peek();
+        if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
+            stack.pop();
+            return;
+        }
+        Set<String> children = new HashSet<>(edgesOut.get(compId1));
+        for (String compId2: children) {
+            if (seen.contains(compId2)) {
+                // cycle/diamond detected
+                List<String> possibleCycle = new ArrayList<>();
+                if (compId1.equals(compId2)) {
+                    possibleCycle.add(compId2);
+                } else if (edgesOut.get(compId2).contains(compId1)) {

Review comment:
       With a slightly modified test case,
   ```
                       tb.setSpout("spout1", new TestWordSpout(), 10);
                       tb.setSpout("spout2", new TestWordSpout(), 10);
                       tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt4").shuffleGrouping("bolt2");
                       tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("bolt1");
                       tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("bolt2").shuffleGrouping("bolt4");
                       tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("spout2");
   ```
   I am seeing 
   ```
   java.lang.NullPointerException
   	at org.apache.storm.utils.Utils.findComponentCyclesRecursion(Utils.java:1961)
   	at org.apache.storm.utils.Utils.findComponentCyclesRecursion(Utils.java:1981)
   ```
   I am reading the code to try to figure out why

##########
File path: storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
##########
@@ -237,4 +247,167 @@ public void checkVersionInfo() {
         assertNotNull(found);
         assertEquals(key, found.getVersion());
     }
+
+    @Test
+    public void testFindComponentCycles() {
+        class CycleDetectionScenario {
+            final String testName;
+            final String testDescription;
+            final StormTopology topology;
+            final int expectedCycles;
+
+            CycleDetectionScenario() {
+                testName = "dummy";
+                testDescription = "dummy test";
+                topology = null;
+                expectedCycles = 0;
+            }
+
+            CycleDetectionScenario(String testName, String testDescription, StormTopology topology, int expectedCycles) {
+                this.testName = testName.replace(' ', '-');
+                this.testDescription = testDescription;
+                this.topology = topology;
+                this.expectedCycles = expectedCycles;
+            }
+
+            public List<CycleDetectionScenario> createTestScenarios() {
+                List<CycleDetectionScenario> ret = new ArrayList<>();
+                int testNo = 0;
+                CycleDetectionScenario s;
+                TopologyBuilder tb;
+
+                // Base case
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    s = new CycleDetectionScenario(String.format("(%d) Base", testNo),
+                            "Three level component hierarchy with no loops",
+                            tb.createTopology(),
+                            0);
+                    ret.add(s);
+                }
+
+                // single loop with one bolt
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    // loop bolt 3  (also connect bolt3 to spout 1)
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt3");
+                    ret.add(new CycleDetectionScenario(String.format("(%d) One Loop", testNo),
+                            "Four level component hierarchy with 1 cycle in bolt3",
+                            tb.createTopology(),
+                            1));
+                }
+
+                // single loop with three bolts
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    // loop bolt 3 -> 4 -> 5 -> 3 (also connect bolt3 to spout1)
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt5");
+                    tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("bolt3");
+                    tb.setBolt("bolt5", new TestWordCounter(), 10).shuffleGrouping("bolt4");
+                    ret.add(new CycleDetectionScenario(String.format("(%d) One Loop", testNo),
+                            "Four level component hierarchy with 1 cycle in bolt3,bolt4,bolt5",
+                            tb.createTopology(),
+                            1));
+                }
+
+                // two loops with three bolts, and one bolt
+                {
+                    testNo++;
+                    tb = new TopologyBuilder();
+                    tb.setSpout("spout1", new TestWordSpout(), 10);
+                    tb.setBolt("bolt1", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt2", new TestWordCounter(), 10).shuffleGrouping("spout1");
+                    tb.setBolt("bolt11", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt12", new TestWordCounter(), 10).shuffleGrouping("bolt1");
+                    tb.setBolt("bolt21", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    tb.setBolt("bolt22", new TestWordCounter(), 10).shuffleGrouping("bolt2");
+                    // loop bolt 3 -> 4 -> 5 -> 3 (also connect bolt3 to spout1)
+                    tb.setBolt("bolt3", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt5");
+                    tb.setBolt("bolt4", new TestWordCounter(), 10).shuffleGrouping("bolt3");
+                    tb.setBolt("bolt5", new TestWordCounter(), 10).shuffleGrouping("bolt4");
+                    // loop bolt 6  (also connect bolt6 to spout 1)
+                    tb.setBolt("bolt6", new TestWordCounter(), 10).shuffleGrouping("spout1").shuffleGrouping("bolt6");
+                    ret.add(new CycleDetectionScenario(String.format("(%d) Two Loops", testNo),
+                            "Four level component hierarchy with 2 cycles in bolt3,bolt4,bolt5 and bolt6",
+                            tb.createTopology(),
+                            2));
+                }
+
+                // complex cycle
+                {
+                    // (S1 -> B1 -> B2 -> B3 -> B4 <- S2), (B4 -> B3), (B4 -> B2)

Review comment:
       The description seems incorrect.
   
   It should be 
   ```
    // (S1 -> B1 -> B2 -> B3 -> B4 <- S2), (B4 -> B3), (B4 -> B1)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3322: [STORM-3685] Detect and prevent cycles when Topology is submitted.

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3322:
URL: https://github.com/apache/storm/pull/3322#discussion_r481210109



##########
File path: storm-client/src/jvm/org/apache/storm/utils/Utils.java
##########
@@ -1907,4 +1909,117 @@ private void readArchive(ZipFile zipFile) throws IOException {
             }
         }
     }
+
+    /**
+     * Create a map of forward edges for bolts in a topology. Note that spouts can be source but not a target in
+     * the edge. The mapping contains ids of spouts and bolts.

Review comment:
       okay




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org