You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/20 19:35:36 UTC

[GitHub] [flink] azagrebin commented on a change in pull request #13205: [FLINK-17330[runtime] Merge cyclic dependent pipelined regions into one region

azagrebin commented on a change in pull request #13205:
URL: https://github.com/apache/flink/pull/13205#discussion_r474212420



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/StronglyConnectedComponentsComputeUtils.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Utility for computing strongly connected components.
+ *
+ * <p>The computation is an implementation of Tarjan's algorithm.
+ *
+ * <p>Ref: https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm.
+ */
+public final class StronglyConnectedComponentsComputeUtils {
+
+	static Set<Set<Integer>> computeStronglyConnectedComponents(final int numVertex, final List<List<Integer>> outEdges) {
+		final Set<Set<Integer>> stronglyConnectedComponents = new HashSet<>();
+
+		final int[] vertexIndices = new int[numVertex];
+		Arrays.fill(vertexIndices, -1);
+
+		final int[] vertexLowLinks = new int[numVertex];
+		final Deque<Integer> stack = new ArrayDeque<>(numVertex);

Review comment:
       ```suggestion
   		final Deque<Integer> ccStack = new ArrayDeque<>(numVertex);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/StronglyConnectedComponentsComputeUtils.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Utility for computing strongly connected components.
+ *
+ * <p>The computation is an implementation of Tarjan's algorithm.
+ *
+ * <p>Ref: https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm.
+ */
+public final class StronglyConnectedComponentsComputeUtils {
+
+	static Set<Set<Integer>> computeStronglyConnectedComponents(final int numVertex, final List<List<Integer>> outEdges) {
+		final Set<Set<Integer>> stronglyConnectedComponents = new HashSet<>();
+
+		final int[] vertexIndices = new int[numVertex];
+		Arrays.fill(vertexIndices, -1);
+
+		final int[] vertexLowLinks = new int[numVertex];
+		final Deque<Integer> stack = new ArrayDeque<>(numVertex);
+		final boolean[] onStack = new boolean[numVertex];
+		final AtomicInteger indexCounter = new AtomicInteger(0);
+
+		for (int vertex = 0; vertex < numVertex; vertex++) {
+			if (vertexIndices[vertex] == -1) {
+				dfs(vertex, outEdges, vertexIndices, vertexLowLinks, stack, onStack, indexCounter, stronglyConnectedComponents);
+			}
+		}
+
+		return stronglyConnectedComponents;
+	}
+
+	private static void dfs(
+			final int rootVertex,
+			final List<List<Integer>> outEdges,
+			final int[] vertexIndices,
+			final int[] vertexLowLinks,
+			final Deque<Integer> stack,
+			final boolean[] onStack,
+			final AtomicInteger indexCounter,
+			final Set<Set<Integer>> stronglyConnectedComponents) {
+
+		final Deque<Tuple2<Integer, Integer>> loopStack = new ArrayDeque<>();
+		loopStack.add(new Tuple2<>(rootVertex, 0));
+
+		while (!loopStack.isEmpty()) {
+			Tuple2<Integer, Integer> tuple = loopStack.pollLast();
+			final int currentVertex = tuple.f0;
+			final int vertexLoopCount = tuple.f1;
+
+			if (vertexLoopCount == 0) {
+				vertexIndices[currentVertex] = indexCounter.get();
+				vertexLowLinks[currentVertex] = indexCounter.getAndIncrement();
+				stack.add(currentVertex);
+				onStack[currentVertex] = true;
+			} else if (vertexLoopCount > 0) {
+				final int successorVertex = outEdges.get(currentVertex).get(vertexLoopCount - 1);
+				vertexLowLinks[currentVertex] = Math.min(vertexLowLinks[currentVertex], vertexLowLinks[successorVertex]);
+			}
+
+			boolean visitSuccessorVertex = false;
+			for (int i = vertexLoopCount; i < outEdges.get(currentVertex).size(); i++) {
+				final int successorVertex = outEdges.get(currentVertex).get(i);
+				if (vertexIndices[successorVertex] == -1) {
+					loopStack.add(new Tuple2<>(currentVertex, i + 1));
+					loopStack.add(new Tuple2<>(successorVertex, 0));
+					visitSuccessorVertex = true;
+					break;
+				} else if (onStack[successorVertex]) {
+					vertexLowLinks[currentVertex] = Math.min(vertexLowLinks[currentVertex], vertexIndices[successorVertex]);
+				}
+			}
+
+			if (visitSuccessorVertex) {
+				continue;
+			}
+
+			if (vertexLowLinks[currentVertex] == vertexIndices[currentVertex]) {
+				final Set<Integer> scc = new HashSet<>();
+				while (onStack[currentVertex]) {
+					final int v = stack.pollLast();
+					onStack[v] = false;
+					scc.add(v);
+
+				}
+				stronglyConnectedComponents.add(scc);
+			}
+		}
+	}
+
+	private StronglyConnectedComponentsComputeUtils() {

Review comment:
       From what I saw before and discussed as code style, we usually put instance methods before static methods.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/StronglyConnectedComponentsComputeUtils.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Utility for computing strongly connected components.
+ *
+ * <p>The computation is an implementation of Tarjan's algorithm.
+ *
+ * <p>Ref: https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm.
+ */
+public final class StronglyConnectedComponentsComputeUtils {
+
+	static Set<Set<Integer>> computeStronglyConnectedComponents(final int numVertex, final List<List<Integer>> outEdges) {
+		final Set<Set<Integer>> stronglyConnectedComponents = new HashSet<>();
+
+		final int[] vertexIndices = new int[numVertex];
+		Arrays.fill(vertexIndices, -1);
+
+		final int[] vertexLowLinks = new int[numVertex];
+		final Deque<Integer> stack = new ArrayDeque<>(numVertex);
+		final boolean[] onStack = new boolean[numVertex];
+		final AtomicInteger indexCounter = new AtomicInteger(0);
+
+		for (int vertex = 0; vertex < numVertex; vertex++) {
+			if (vertexIndices[vertex] == -1) {
+				dfs(vertex, outEdges, vertexIndices, vertexLowLinks, stack, onStack, indexCounter, stronglyConnectedComponents);
+			}
+		}
+
+		return stronglyConnectedComponents;
+	}
+
+	private static void dfs(
+			final int rootVertex,
+			final List<List<Integer>> outEdges,
+			final int[] vertexIndices,
+			final int[] vertexLowLinks,
+			final Deque<Integer> stack,
+			final boolean[] onStack,
+			final AtomicInteger indexCounter,
+			final Set<Set<Integer>> stronglyConnectedComponents) {
+
+		final Deque<Tuple2<Integer, Integer>> loopStack = new ArrayDeque<>();
+		loopStack.add(new Tuple2<>(rootVertex, 0));
+
+		while (!loopStack.isEmpty()) {
+			Tuple2<Integer, Integer> tuple = loopStack.pollLast();
+			final int currentVertex = tuple.f0;
+			final int vertexLoopCount = tuple.f1;
+
+			if (vertexLoopCount == 0) {
+				vertexIndices[currentVertex] = indexCounter.get();

Review comment:
       I would add more comments for respective code blocks or maybe even additionally break the loop body into three methods/steps:
   ```
   startTraversingVertex
   finishTaversingOutEdge
   traverseOutEdges
   createConnectedComponent
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/StronglyConnectedComponentsComputeUtilsTest.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.runtime.executiongraph.failover.flip1.StronglyConnectedComponentsComputeUtils.computeStronglyConnectedComponents;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link StronglyConnectedComponentsComputeUtils}.
+ */
+public class StronglyConnectedComponentsComputeUtilsTest extends TestLogger {
+
+	@Test
+	public void testWithCycles() {
+		final List<List<Integer>> edges = Arrays.asList(
+			Arrays.asList(2, 3),
+			Arrays.asList(0),
+			Arrays.asList(1),
+			Arrays.asList(4),
+			Collections.emptyList());
+
+		final Set<Set<Integer>> result = computeStronglyConnectedComponents(5, edges);
+
+		final Set<Set<Integer>> expected = new HashSet<>();
+		expected.add(new HashSet<>(Arrays.asList(0, 1, 2)));
+		expected.add(Collections.singleton(3));
+		expected.add(Collections.singleton(4));
+
+		assertThat(result, is(expected));
+	}
+
+	@Test
+	public void testWithMultipleCycles() {
+		final List<List<Integer>> edges = Arrays.asList(
+			Arrays.asList(1),
+			Arrays.asList(2),
+			Arrays.asList(0),
+			Arrays.asList(1, 2, 4),
+			Arrays.asList(3, 5),
+			Arrays.asList(2, 6),
+			Arrays.asList(5),
+			Arrays.asList(4, 6, 7));
+
+		final Set<Set<Integer>> result = computeStronglyConnectedComponents(8, edges);
+
+		final Set<Set<Integer>> expected = new HashSet<>();
+		expected.add(new HashSet<>(Arrays.asList(0, 1, 2)));
+		expected.add(new HashSet<>(Arrays.asList(3, 4)));
+		expected.add(new HashSet<>(Arrays.asList(5, 6)));
+		expected.add(Collections.singleton(7));
+
+		assertThat(result, is(expected));
+	}
+
+	@Test
+	public void testWithConnectedCycles() {
+		final List<List<Integer>> edges = Arrays.asList(
+			Arrays.asList(1),
+			Arrays.asList(2, 4, 5),
+			Arrays.asList(3, 6),
+			Arrays.asList(2, 7),
+			Arrays.asList(0, 5),
+			Arrays.asList(6),
+			Arrays.asList(5),
+			Arrays.asList(3, 6));
+
+		final Set<Set<Integer>> result = computeStronglyConnectedComponents(8, edges);
+
+		final Set<Set<Integer>> expected = new HashSet<>();
+		expected.add(new HashSet<>(Arrays.asList(0, 1, 4)));
+		expected.add(new HashSet<>(Arrays.asList(2, 3, 7)));
+		expected.add(new HashSet<>(Arrays.asList(5, 6)));
+
+		assertThat(result, is(expected));
+	}
+
+	@Test
+	public void testWithNoEdge() {
+		final List<List<Integer>> edges = Arrays.asList(
+			Collections.emptyList(),
+			Collections.emptyList(),
+			Collections.emptyList(),
+			Collections.emptyList(),
+			Collections.emptyList());
+
+		final Set<Set<Integer>> result = computeStronglyConnectedComponents(5, edges);
+
+		final Set<Set<Integer>> expected = new HashSet<>();
+		expected.add(Collections.singleton(0));
+		expected.add(Collections.singleton(1));
+		expected.add(Collections.singleton(2));
+		expected.add(Collections.singleton(3));
+		expected.add(Collections.singleton(4));
+
+		assertThat(result, is(expected));
+	}
+
+	@Test
+	public void testWithNoCycle() {
+		final List<List<Integer>> edges = Arrays.asList(
+			Arrays.asList(1),
+			Arrays.asList(2),
+			Arrays.asList(3),
+			Arrays.asList(4),
+			Collections.emptyList());
+
+		final Set<Set<Integer>> result = computeStronglyConnectedComponents(5, edges);
+
+		final Set<Set<Integer>> expected = new HashSet<>();
+		expected.add(Collections.singleton(0));
+		expected.add(Collections.singleton(1));
+		expected.add(Collections.singleton(2));
+		expected.add(Collections.singleton(3));
+		expected.add(Collections.singleton(4));
+
+		assertThat(result, is(expected));
+	}
+
+	@Test
+	public void testLargeGraph() {
+		final int n = 100000;
+		final List<List<Integer>> edges = new ArrayList<>();
+		for (int i = 0; i < n; i++) {
+			edges.add(Collections.singletonList((i + 1) % n));
+		}
+
+		final Set<Set<Integer>> result = computeStronglyConnectedComponents(n, edges);
+
+		final Set<Integer> singleComponent = IntStream.range(0, n).boxed().collect(Collectors.toSet());
+
+		assertThat(result, is(Collections.singleton(singleComponent)));
+	}
+
+	@Test
+	public void testArbitraryGraph() {

Review comment:
       Do we need this random test? Is it not covered by separate deterministic cases in other tests?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/StronglyConnectedComponentsComputeUtils.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Utility for computing strongly connected components.
+ *
+ * <p>The computation is an implementation of Tarjan's algorithm.
+ *
+ * <p>Ref: https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm.
+ */
+public final class StronglyConnectedComponentsComputeUtils {
+
+	static Set<Set<Integer>> computeStronglyConnectedComponents(final int numVertex, final List<List<Integer>> outEdges) {
+		final Set<Set<Integer>> stronglyConnectedComponents = new HashSet<>();
+
+		final int[] vertexIndices = new int[numVertex];
+		Arrays.fill(vertexIndices, -1);
+
+		final int[] vertexLowLinks = new int[numVertex];
+		final Deque<Integer> stack = new ArrayDeque<>(numVertex);
+		final boolean[] onStack = new boolean[numVertex];
+		final AtomicInteger indexCounter = new AtomicInteger(0);
+
+		for (int vertex = 0; vertex < numVertex; vertex++) {
+			if (vertexIndices[vertex] == -1) {
+				dfs(vertex, outEdges, vertexIndices, vertexLowLinks, stack, onStack, indexCounter, stronglyConnectedComponents);
+			}
+		}
+
+		return stronglyConnectedComponents;
+	}
+
+	private static void dfs(
+			final int rootVertex,
+			final List<List<Integer>> outEdges,
+			final int[] vertexIndices,
+			final int[] vertexLowLinks,
+			final Deque<Integer> stack,
+			final boolean[] onStack,
+			final AtomicInteger indexCounter,
+			final Set<Set<Integer>> stronglyConnectedComponents) {
+
+		final Deque<Tuple2<Integer, Integer>> loopStack = new ArrayDeque<>();

Review comment:
       ```suggestion
   		final Deque<Tuple2<Integer, Integer>> dfsLoopStack = new ArrayDeque<>();
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/StronglyConnectedComponentsComputeUtils.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Utility for computing strongly connected components.
+ *
+ * <p>The computation is an implementation of Tarjan's algorithm.
+ *
+ * <p>Ref: https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm.
+ */
+public final class StronglyConnectedComponentsComputeUtils {
+
+	static Set<Set<Integer>> computeStronglyConnectedComponents(final int numVertex, final List<List<Integer>> outEdges) {
+		final Set<Set<Integer>> stronglyConnectedComponents = new HashSet<>();
+
+		final int[] vertexIndices = new int[numVertex];
+		Arrays.fill(vertexIndices, -1);
+
+		final int[] vertexLowLinks = new int[numVertex];
+		final Deque<Integer> stack = new ArrayDeque<>(numVertex);
+		final boolean[] onStack = new boolean[numVertex];
+		final AtomicInteger indexCounter = new AtomicInteger(0);
+
+		for (int vertex = 0; vertex < numVertex; vertex++) {
+			if (vertexIndices[vertex] == -1) {
+				dfs(vertex, outEdges, vertexIndices, vertexLowLinks, stack, onStack, indexCounter, stronglyConnectedComponents);
+			}
+		}
+
+		return stronglyConnectedComponents;
+	}
+
+	private static void dfs(
+			final int rootVertex,
+			final List<List<Integer>> outEdges,
+			final int[] vertexIndices,
+			final int[] vertexLowLinks,
+			final Deque<Integer> stack,
+			final boolean[] onStack,
+			final AtomicInteger indexCounter,
+			final Set<Set<Integer>> stronglyConnectedComponents) {
+
+		final Deque<Tuple2<Integer, Integer>> loopStack = new ArrayDeque<>();
+		loopStack.add(new Tuple2<>(rootVertex, 0));
+
+		while (!loopStack.isEmpty()) {
+			Tuple2<Integer, Integer> tuple = loopStack.pollLast();
+			final int currentVertex = tuple.f0;
+			final int vertexLoopCount = tuple.f1;

Review comment:
       ```suggestion
   			final int vertexOutEdgeIndex = tuple.f1;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org