You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/01/28 03:47:27 UTC
[flink] 04/06: [FLINK-30755][runtime] Support SupportsConcurrentExecutionAttempts property of Transformation
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a9e1738a2e131426ea8fa3051a2f4cba841730eb
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Mon Jan 23 01:11:38 2023 +0800
[FLINK-30755][runtime] Support SupportsConcurrentExecutionAttempts property of Transformation
---
.../api/transformations/LegacySinkTransformation.java | 5 +++++
.../streaming/api/transformations/PhysicalTransformation.java | 11 +++++++++++
.../translators/AbstractOneInputTransformationTranslator.java | 8 ++++++++
.../translators/AbstractTwoInputTransformationTranslator.java | 8 ++++++++
.../translators/LegacySinkTransformationTranslator.java | 3 +++
.../translators/LegacySourceTransformationTranslator.java | 3 +++
.../translators/MultiInputTransformationTranslator.java | 3 +++
.../runtime/translators/SourceTransformationTranslator.java | 4 ++++
8 files changed, 45 insertions(+)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
index 8977c7075df..99e0124ece1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
@@ -128,4 +128,9 @@ public class LegacySinkTransformation<T> extends PhysicalTransformation<T> {
public final void setChainingStrategy(ChainingStrategy strategy) {
operatorFactory.setChainingStrategy(strategy);
}
+
+ @Override
+ public boolean isSupportsConcurrentExecutionAttempts() {
+ return false;
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java
index 3c2749f4ef4..31b689a77fd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java
@@ -33,6 +33,8 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
@Internal
public abstract class PhysicalTransformation<T> extends Transformation<T> {
+ private boolean supportsConcurrentExecutionAttempts = true;
+
/**
* Creates a new {@code Transformation} with the given name, output type and parallelism.
*
@@ -47,4 +49,13 @@ public abstract class PhysicalTransformation<T> extends Transformation<T> {
/** Sets the chaining strategy of this {@code Transformation}. */
public abstract void setChainingStrategy(ChainingStrategy strategy);
+
+ public boolean isSupportsConcurrentExecutionAttempts() {
+ return supportsConcurrentExecutionAttempts;
+ }
+
+ public void setSupportsConcurrentExecutionAttempts(
+ boolean supportsConcurrentExecutionAttempts) {
+ this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java
index 03d7e1ab8ce..6ac7ce2c37a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import javax.annotation.Nullable;
@@ -91,6 +92,13 @@ abstract class AbstractOneInputTransformationTranslator<IN, OUT, OP extends Tran
streamGraph.addEdge(inputId, transformationId, 0);
}
+ if (transformation instanceof PhysicalTransformation) {
+ streamGraph.setSupportsConcurrentExecutionAttempts(
+ transformationId,
+ ((PhysicalTransformation<OUT>) transformation)
+ .isSupportsConcurrentExecutionAttempts());
+ }
+
return Collections.singleton(transformationId);
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java
index 03684b89529..c62a3cbed8e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import javax.annotation.Nullable;
@@ -100,6 +101,13 @@ public abstract class AbstractTwoInputTransformationTranslator<
streamGraph.addEdge(inputId, transformationId, 2);
}
+ if (transformation instanceof PhysicalTransformation) {
+ streamGraph.setSupportsConcurrentExecutionAttempts(
+ transformationId,
+ ((PhysicalTransformation<OUT>) transformation)
+ .isSupportsConcurrentExecutionAttempts());
+ }
+
return Collections.singleton(transformationId);
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java
index 378a7171566..9a571f469dd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java
@@ -95,6 +95,9 @@ public class LegacySinkTransformationTranslator<IN>
streamGraph.setParallelism(transformationId, parallelism);
streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
+ streamGraph.setSupportsConcurrentExecutionAttempts(
+ transformationId, transformation.isSupportsConcurrentExecutionAttempts());
+
for (Integer inputId : context.getStreamNodeIds(input)) {
streamGraph.addEdge(inputId, transformationId, 0);
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
index 8ab5a2a8ee3..c92ba158082 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
@@ -86,6 +86,9 @@ public class LegacySourceTransformationTranslator<OUT>
streamGraph.setParallelism(transformationId, parallelism);
streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
+ streamGraph.setSupportsConcurrentExecutionAttempts(
+ transformationId, transformation.isSupportsConcurrentExecutionAttempts());
+
return Collections.singleton(transformationId);
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
index d5af8bd8562..e381b47d822 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
@@ -133,6 +133,9 @@ public class MultiInputTransformationTranslator<OUT>
}
}
+ streamGraph.setSupportsConcurrentExecutionAttempts(
+ transformationId, transformation.isSupportsConcurrentExecutionAttempts());
+
return Collections.singleton(transformationId);
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
index af251755a77..57ce5baedb0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
@@ -95,6 +95,10 @@ public class SourceTransformationTranslator<OUT, SplitT extends SourceSplit, Enu
streamGraph.setParallelism(transformationId, parallelism);
streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
+
+ streamGraph.setSupportsConcurrentExecutionAttempts(
+ transformationId, transformation.isSupportsConcurrentExecutionAttempts());
+
return Collections.singleton(transformationId);
}
}