You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/01/28 03:47:26 UTC
[flink] 03/06: [FLINK-30755][runtime] Support SupportsConcurrentExecutionAttempts property of StreamGraph and JobGraph
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 242987c75e519ae7b084b19226e598e9663de555
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Mon Jan 23 01:00:27 2023 +0800
[FLINK-30755][runtime] Support SupportsConcurrentExecutionAttempts property of StreamGraph and JobGraph
---
.../executiongraph/SpeculativeExecutionVertex.java | 4 ++
.../apache/flink/runtime/jobgraph/JobVertex.java | 15 ++++++
.../flink/streaming/api/graph/StreamGraph.java | 8 ++++
.../flink/streaming/api/graph/StreamNode.java | 11 +++++
.../api/graph/StreamingJobGraphGenerator.java | 26 +++++++++++
.../api/graph/StreamingJobGraphGeneratorTest.java | 53 ++++++++++++++++++++++
6 files changed, 117 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
index 66d9625b103..48171a81d57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java
@@ -80,6 +80,10 @@ public class SpeculativeExecutionVertex extends ExecutionVertex {
return getJobVertex().getJobVertex().containsSinks();
}
+ public boolean isSupportsConcurrentExecutionAttempts() {
+ return getJobVertex().getJobVertex().isSupportsConcurrentExecutionAttempts();
+ }
+
public Execution createNewSpeculativeExecution(final long timestamp) {
final Execution newExecution = createNewExecution(timestamp);
getExecutionGraphAccessor().registerExecution(newExecution);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index a28a3c43a88..eb322fe72bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -155,6 +155,12 @@ public class JobVertex implements java.io.Serializable {
/** Indicates whether this job vertex contains sink operators. */
private boolean containsSinkOperators = false;
+ /**
+ * Indicates whether this job vertex supports multiple attempts of the same subtask executing at
+ * the same time.
+ */
+ private boolean supportsConcurrentExecutionAttempts = true;
+
// --------------------------------------------------------------------------------------------
/**
@@ -553,6 +559,15 @@ public class JobVertex implements java.io.Serializable {
return containsSinkOperators;
}
+ public void setSupportsConcurrentExecutionAttempts(
+ boolean supportsConcurrentExecutionAttempts) {
+ this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
+ }
+
+ public boolean isSupportsConcurrentExecutionAttempts() {
+ return supportsConcurrentExecutionAttempts;
+ }
+
// --------------------------------------------------------------------------------------------
/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index ad7a91a9bf4..6b082ffd45d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -1066,4 +1066,12 @@ public class StreamGraph implements Pipeline {
public List<JobStatusHook> getJobStatusHooks() {
return this.jobStatusHooks;
}
+
+ public void setSupportsConcurrentExecutionAttempts(
+ Integer vertexId, boolean supportsConcurrentExecutionAttempts) {
+ final StreamNode streamNode = getStreamNode(vertexId);
+ if (streamNode != null) {
+ streamNode.setSupportsConcurrentExecutionAttempts(supportsConcurrentExecutionAttempts);
+ }
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 68edc370b5d..6177c0fdbc1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -93,6 +93,8 @@ public class StreamNode {
private @Nullable IntermediateDataSetID consumeClusterDatasetId;
+ private boolean supportsConcurrentExecutionAttempts = true;
+
@VisibleForTesting
public StreamNode(
Integer id,
@@ -418,4 +420,13 @@ public class StreamNode {
@Nullable IntermediateDataSetID consumeClusterDatasetId) {
this.consumeClusterDatasetId = consumeClusterDatasetId;
}
+
+ public boolean isSupportsConcurrentExecutionAttempts() {
+ return supportsConcurrentExecutionAttempts;
+ }
+
+ public void setSupportsConcurrentExecutionAttempts(
+ boolean supportsConcurrentExecutionAttempts) {
+ this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 5161a2586bf..2e7b332cd9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -240,6 +240,7 @@ public class StreamingJobGraphGenerator {
setPhysicalEdges();
markContainsSourcesOrSinks();
+ markSupportingConcurrentExecutionAttempts();
setSlotSharingAndCoLocation();
@@ -1410,6 +1411,31 @@ public class StreamingJobGraphGenerator {
}
}
+ private void markSupportingConcurrentExecutionAttempts() {
+ for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
+ final JobVertex jobVertex = entry.getValue();
+ final Set<Integer> vertexOperators = new HashSet<>();
+ vertexOperators.add(entry.getKey());
+ final Map<Integer, StreamConfig> vertexChainedConfigs =
+ chainedConfigs.get(entry.getKey());
+ if (vertexChainedConfigs != null) {
+ vertexOperators.addAll(vertexChainedConfigs.keySet());
+ }
+
+ // disable supportConcurrentExecutionAttempts of job vertex if there is any stream node
+ // does not support it
+ boolean supportConcurrentExecutionAttempts = true;
+ for (int nodeId : vertexOperators) {
+ final StreamNode streamNode = streamGraph.getStreamNode(nodeId);
+ if (!streamNode.isSupportsConcurrentExecutionAttempts()) {
+ supportConcurrentExecutionAttempts = false;
+ break;
+ }
+ }
+ jobVertex.setSupportsConcurrentExecutionAttempts(supportConcurrentExecutionAttempts);
+ }
+ }
+
private void setSlotSharingAndCoLocation() {
setSlotSharing();
setCoLocation();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 23a481c4ff0..16f16084803 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -72,6 +72,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -132,6 +133,7 @@ import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.ar
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.InstanceOfAssertFactories.stream;
/** Tests for {@link StreamingJobGraphGenerator}. */
@ExtendWith(TestLoggerExtension.class)
@@ -1733,6 +1735,57 @@ class StreamingJobGraphGeneratorTest {
.hasRootCauseMessage("This provider is not serializable.");
}
+ @Test
+ void testSupportConcurrentExecutionAttempts() {
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+ final DataStream<Integer> source = env.fromElements(1, 2, 3).name("source");
+ // source -> (map1 -> map2) -> sink
+ source.rebalance()
+ .map(v -> v)
+ .name("map1")
+ .map(v -> v)
+ .name("map2")
+ .rebalance()
+ .sinkTo(new PrintSink<>())
+ .name("sink");
+
+ final StreamGraph streamGraph = env.getStreamGraph();
+ final List<StreamNode> streamNodes =
+ streamGraph.getStreamNodes().stream()
+ .sorted(Comparator.comparingInt(StreamNode::getId))
+ .collect(Collectors.toList());
+
+ final StreamNode sourceNode = streamNodes.get(0);
+ final StreamNode map1Node = streamNodes.get(1);
+ final StreamNode map2Node = streamNodes.get(2);
+ final StreamNode sinkNode = streamNodes.get(3);
+ streamGraph.setSupportsConcurrentExecutionAttempts(sourceNode.getId(), true);
+ // map1 and map2 are chained
+ // map1 supports concurrent execution attempt however map2 does not
+ streamGraph.setSupportsConcurrentExecutionAttempts(map1Node.getId(), true);
+ streamGraph.setSupportsConcurrentExecutionAttempts(map2Node.getId(), false);
+ streamGraph.setSupportsConcurrentExecutionAttempts(sinkNode.getId(), false);
+
+ final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+ assertThat(jobGraph.getNumberOfVertices()).isEqualTo(3);
+ for (JobVertex jobVertex : jobGraph.getVertices()) {
+ if (jobVertex.getName().contains("source")) {
+ assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
+ } else if (jobVertex.getName().contains("map")) {
+ // chained job vertex does not support concurrent execution attempt if any operator
+ // in chain does not support it
+ assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
+ } else if (jobVertex.getName().contains("sink")) {
+ assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
+ } else {
+ Assertions.fail("Unexpected job vertex " + jobVertex.getName());
+ }
+ }
+ }
+
private static class SerializationTestOperatorFactory
extends AbstractStreamOperatorFactory<Integer>
implements CoordinatedOperatorFactory<Integer> {