You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/02/25 18:25:24 UTC

[flink] 01/02: [FLINK-21490][datastream] Make job graph generation deterministic for multiple input nodes.

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cedf1c47f20d6c04c4100f930f0e07adab9b4aac
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Wed Feb 24 22:59:56 2021 +0100

    [FLINK-21490][datastream] Make job graph generation deterministic for multiple input nodes.
    
    Traversing the stream graph randomly for different input nodes during job graph generation causes the gates of multi-input operators to be swapped on restart. Unaligned checkpoints build on the assumption that the order is deterministic though.
---
 .../api/graph/StreamingJobGraphGenerator.java      |  6 +++-
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 39 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index d0da83b..210ddab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -80,6 +80,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
@@ -345,7 +346,10 @@ public class StreamingJobGraphGenerator {
         final Map<Integer, OperatorChainInfo> chainEntryPoints =
                 buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
         final Collection<OperatorChainInfo> initialEntryPoints =
-                new ArrayList<>(chainEntryPoints.values());
+                chainEntryPoints.entrySet().stream()
+                        .sorted(Comparator.comparing(Entry::getKey))
+                        .map(Entry::getValue)
+                        .collect(Collectors.toList());
 
         // iterate over a copy of the values, because this map gets concurrently modified
         for (OperatorChainInfo info : initialEntryPoints) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index ba4f97f..df78107 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -933,6 +933,45 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         assertEquals(4, vertices.get(0).getOperatorIDs().size());
     }
 
+    @Test
+    public void testDeterministicUnionOrder() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+        JobGraph jobGraph = getUnionJobGraph(env);
+        JobVertex jobSink = Iterables.getLast(jobGraph.getVerticesSortedTopologicallyFromSources());
+        List<String> expectedSourceOrder =
+                jobSink.getInputs().stream()
+                        .map(edge -> edge.getSource().getProducer().getName())
+                        .collect(Collectors.toList());
+
+        for (int i = 0; i < 100; i++) {
+            JobGraph jobGraph2 = getUnionJobGraph(env);
+            JobVertex jobSink2 =
+                    Iterables.getLast(jobGraph2.getVerticesSortedTopologicallyFromSources());
+            assertNotEquals("Different runs should yield different vertexes", jobSink, jobSink2);
+            List<String> actualSourceOrder =
+                    jobSink2.getInputs().stream()
+                            .map(edge -> edge.getSource().getProducer().getName())
+                            .collect(Collectors.toList());
+            assertEquals("Union inputs reordered", expectedSourceOrder, actualSourceOrder);
+        }
+    }
+
+    private JobGraph getUnionJobGraph(StreamExecutionEnvironment env) {
+
+        createSource(env, 1)
+                .union(createSource(env, 2))
+                .union(createSource(env, 3))
+                .union(createSource(env, 4))
+                .addSink(new DiscardingSink<>());
+
+        return StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+    }
+
+    private DataStream<Integer> createSource(StreamExecutionEnvironment env, int index) {
+        return env.fromElements(index).name("source" + index).map(i -> i).name("map" + index);
+    }
+
     @Test(expected = UnsupportedOperationException.class)
     public void testNotSupportInputSelectableOperatorIfCheckpointing() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();