You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/01/28 03:47:26 UTC

[flink] 03/06: [FLINK-30755][runtime] Support SupportsConcurrentExecutionAttempts property of StreamGraph and JobGraph

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 242987c75e519ae7b084b19226e598e9663de555
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Mon Jan 23 01:00:27 2023 +0800

    [FLINK-30755][runtime] Support SupportsConcurrentExecutionAttempts property of StreamGraph and JobGraph
---
 .../executiongraph/SpeculativeExecutionVertex.java |  4 ++
 .../apache/flink/runtime/jobgraph/JobVertex.java   | 15 ++++++
 .../flink/streaming/api/graph/StreamGraph.java     |  8 ++++
 .../flink/streaming/api/graph/StreamNode.java      | 11 +++++
 .../api/graph/StreamingJobGraphGenerator.java      | 26 +++++++++++
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 53 ++++++++++++++++++++++
 6 files changed, 117 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
index 66d9625b103..48171a81d57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
@@ -80,6 +80,10 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
         return getJobVertex().getJobVertex().containsSinks();
     }
 
+    public boolean isSupportsConcurrentExecutionAttempts() {
+        return getJobVertex().getJobVertex().isSupportsConcurrentExecutionAttempts();
+    }
+
     public Execution createNewSpeculativeExecution(final long timestamp) {
         final Execution newExecution = createNewExecution(timestamp);
         getExecutionGraphAccessor().registerExecution(newExecution);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index a28a3c43a88..eb322fe72bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -155,6 +155,12 @@ public class JobVertex implements java.io.Serializable {
     /** Indicates whether this job vertex contains sink operators. */
     private boolean containsSinkOperators = false;
 
+    /**
+     * Indicates whether this job vertex supports multiple attempts of the same subtask executing at
+     * the same time.
+     */
+    private boolean supportsConcurrentExecutionAttempts = true;
+
     // --------------------------------------------------------------------------------------------
 
     /**
@@ -553,6 +559,15 @@ public class JobVertex implements java.io.Serializable {
         return containsSinkOperators;
     }
 
+    public void setSupportsConcurrentExecutionAttempts(
+            boolean supportsConcurrentExecutionAttempts) {
+        this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
+    }
+
+    public boolean isSupportsConcurrentExecutionAttempts() {
+        return supportsConcurrentExecutionAttempts;
+    }
+
     // --------------------------------------------------------------------------------------------
 
     /**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index ad7a91a9bf4..6b082ffd45d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -1066,4 +1066,12 @@ public class StreamGraph implements Pipeline {
     public List<JobStatusHook> getJobStatusHooks() {
         return this.jobStatusHooks;
     }
+
+    public void setSupportsConcurrentExecutionAttempts(
+            Integer vertexId, boolean supportsConcurrentExecutionAttempts) {
+        final StreamNode streamNode = getStreamNode(vertexId);
+        if (streamNode != null) {
+            streamNode.setSupportsConcurrentExecutionAttempts(supportsConcurrentExecutionAttempts);
+        }
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 68edc370b5d..6177c0fdbc1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -93,6 +93,8 @@ public class StreamNode {
 
     private @Nullable IntermediateDataSetID consumeClusterDatasetId;
 
+    private boolean supportsConcurrentExecutionAttempts = true;
+
     @VisibleForTesting
     public StreamNode(
             Integer id,
@@ -418,4 +420,13 @@ public class StreamNode {
             @Nullable IntermediateDataSetID consumeClusterDatasetId) {
         this.consumeClusterDatasetId = consumeClusterDatasetId;
     }
+
+    public boolean isSupportsConcurrentExecutionAttempts() {
+        return supportsConcurrentExecutionAttempts;
+    }
+
+    public void setSupportsConcurrentExecutionAttempts(
+            boolean supportsConcurrentExecutionAttempts) {
+        this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 5161a2586bf..2e7b332cd9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -240,6 +240,7 @@ public class StreamingJobGraphGenerator {
         setPhysicalEdges();
 
         markContainsSourcesOrSinks();
+        markSupportingConcurrentExecutionAttempts();
 
         setSlotSharingAndCoLocation();
 
@@ -1410,6 +1411,31 @@ public class StreamingJobGraphGenerator {
         }
     }
 
+    private void markSupportingConcurrentExecutionAttempts() {
+        for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
+            final JobVertex jobVertex = entry.getValue();
+            final Set<Integer> vertexOperators = new HashSet<>();
+            vertexOperators.add(entry.getKey());
+            final Map<Integer, StreamConfig> vertexChainedConfigs =
+                    chainedConfigs.get(entry.getKey());
+            if (vertexChainedConfigs != null) {
+                vertexOperators.addAll(vertexChainedConfigs.keySet());
+            }
+
+            // disable supportConcurrentExecutionAttempts of job vertex if there is any stream node
+            // does not support it
+            boolean supportConcurrentExecutionAttempts = true;
+            for (int nodeId : vertexOperators) {
+                final StreamNode streamNode = streamGraph.getStreamNode(nodeId);
+                if (!streamNode.isSupportsConcurrentExecutionAttempts()) {
+                    supportConcurrentExecutionAttempts = false;
+                    break;
+                }
+            }
+            jobVertex.setSupportsConcurrentExecutionAttempts(supportConcurrentExecutionAttempts);
+        }
+    }
+
     private void setSlotSharingAndCoLocation() {
         setSlotSharing();
         setCoLocation();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 23a481c4ff0..16f16084803 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -72,6 +72,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -132,6 +133,7 @@ import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.ar
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.InstanceOfAssertFactories.stream;
 
 /** Tests for {@link StreamingJobGraphGenerator}. */
 @ExtendWith(TestLoggerExtension.class)
@@ -1733,6 +1735,57 @@ class StreamingJobGraphGeneratorTest {
                 .hasRootCauseMessage("This provider is not serializable.");
     }
 
+    @Test
+    void testSupportConcurrentExecutionAttempts() {
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+        final DataStream<Integer> source = env.fromElements(1, 2, 3).name("source");
+        // source -> (map1 -> map2) -> sink
+        source.rebalance()
+                .map(v -> v)
+                .name("map1")
+                .map(v -> v)
+                .name("map2")
+                .rebalance()
+                .sinkTo(new PrintSink<>())
+                .name("sink");
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+        final List<StreamNode> streamNodes =
+                streamGraph.getStreamNodes().stream()
+                        .sorted(Comparator.comparingInt(StreamNode::getId))
+                        .collect(Collectors.toList());
+
+        final StreamNode sourceNode = streamNodes.get(0);
+        final StreamNode map1Node = streamNodes.get(1);
+        final StreamNode map2Node = streamNodes.get(2);
+        final StreamNode sinkNode = streamNodes.get(3);
+        streamGraph.setSupportsConcurrentExecutionAttempts(sourceNode.getId(), true);
+        // map1 and map2 are chained
+        // map1 supports concurrent execution attempt however map2 does not
+        streamGraph.setSupportsConcurrentExecutionAttempts(map1Node.getId(), true);
+        streamGraph.setSupportsConcurrentExecutionAttempts(map2Node.getId(), false);
+        streamGraph.setSupportsConcurrentExecutionAttempts(sinkNode.getId(), false);
+
+        final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(3);
+        for (JobVertex jobVertex : jobGraph.getVertices()) {
+            if (jobVertex.getName().contains("source")) {
+                assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
+            } else if (jobVertex.getName().contains("map")) {
+                // chained job vertex does not support concurrent execution attempt if any operator
+                // in chain does not support it
+                assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
+            } else if (jobVertex.getName().contains("sink")) {
+                assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
+            } else {
+                Assertions.fail("Unexpected job vertex " + jobVertex.getName());
+            }
+        }
+    }
+
     private static class SerializationTestOperatorFactory
             extends AbstractStreamOperatorFactory<Integer>
             implements CoordinatedOperatorFactory<Integer> {