You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/02/25 18:25:24 UTC
[flink] 01/02: [FLINK-21490][datastream] Make job graph generation
deterministic for multiple input nodes.
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cedf1c47f20d6c04c4100f930f0e07adab9b4aac
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Wed Feb 24 22:59:56 2021 +0100
[FLINK-21490][datastream] Make job graph generation deterministic for multiple input nodes.
Traversing the stream graph randomly for different input nodes during job graph generation causes the gates of multi-input operators to be swapped on restart. Unaligned checkpoints build on the assumption that the order is deterministic though.
---
.../api/graph/StreamingJobGraphGenerator.java | 6 +++-
.../api/graph/StreamingJobGraphGeneratorTest.java | 39 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 1 deletion(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index d0da83b..210ddab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -80,6 +80,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
@@ -345,7 +346,10 @@ public class StreamingJobGraphGenerator {
final Map<Integer, OperatorChainInfo> chainEntryPoints =
buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
final Collection<OperatorChainInfo> initialEntryPoints =
- new ArrayList<>(chainEntryPoints.values());
+ chainEntryPoints.entrySet().stream()
+ .sorted(Comparator.comparing(Entry::getKey))
+ .map(Entry::getValue)
+ .collect(Collectors.toList());
// iterate over a copy of the values, because this map gets concurrently modified
for (OperatorChainInfo info : initialEntryPoints) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index ba4f97f..df78107 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -933,6 +933,45 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
assertEquals(4, vertices.get(0).getOperatorIDs().size());
}
+ @Test
+ public void testDeterministicUnionOrder() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ JobGraph jobGraph = getUnionJobGraph(env);
+ JobVertex jobSink = Iterables.getLast(jobGraph.getVerticesSortedTopologicallyFromSources());
+ List<String> expectedSourceOrder =
+ jobSink.getInputs().stream()
+ .map(edge -> edge.getSource().getProducer().getName())
+ .collect(Collectors.toList());
+
+ for (int i = 0; i < 100; i++) {
+ JobGraph jobGraph2 = getUnionJobGraph(env);
+ JobVertex jobSink2 =
+ Iterables.getLast(jobGraph2.getVerticesSortedTopologicallyFromSources());
+ assertNotEquals("Different runs should yield different vertexes", jobSink, jobSink2);
+ List<String> actualSourceOrder =
+ jobSink2.getInputs().stream()
+ .map(edge -> edge.getSource().getProducer().getName())
+ .collect(Collectors.toList());
+ assertEquals("Union inputs reordered", expectedSourceOrder, actualSourceOrder);
+ }
+ }
+
+ private JobGraph getUnionJobGraph(StreamExecutionEnvironment env) {
+
+ createSource(env, 1)
+ .union(createSource(env, 2))
+ .union(createSource(env, 3))
+ .union(createSource(env, 4))
+ .addSink(new DiscardingSink<>());
+
+ return StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+ }
+
+ private DataStream<Integer> createSource(StreamExecutionEnvironment env, int index) {
+ return env.fromElements(index).name("source" + index).map(i -> i).name("map" + index);
+ }
+
@Test(expected = UnsupportedOperationException.class)
public void testNotSupportInputSelectableOperatorIfCheckpointing() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();