You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/01/28 03:47:27 UTC

[flink] 04/06: [FLINK-30755][runtime] Support SupportsConcurrentExecutionAttempts property of Transformation

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a9e1738a2e131426ea8fa3051a2f4cba841730eb
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Mon Jan 23 01:11:38 2023 +0800

    [FLINK-30755][runtime] Support SupportsConcurrentExecutionAttempts property of Transformation
---
 .../api/transformations/LegacySinkTransformation.java         |  5 +++++
 .../streaming/api/transformations/PhysicalTransformation.java | 11 +++++++++++
 .../translators/AbstractOneInputTransformationTranslator.java |  8 ++++++++
 .../translators/AbstractTwoInputTransformationTranslator.java |  8 ++++++++
 .../translators/LegacySinkTransformationTranslator.java       |  3 +++
 .../translators/LegacySourceTransformationTranslator.java     |  3 +++
 .../translators/MultiInputTransformationTranslator.java       |  3 +++
 .../runtime/translators/SourceTransformationTranslator.java   |  4 ++++
 8 files changed, 45 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
index 8977c7075df..99e0124ece1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
@@ -128,4 +128,9 @@ public class LegacySinkTransformation<T> extends PhysicalTransformation<T> {
     public final void setChainingStrategy(ChainingStrategy strategy) {
         operatorFactory.setChainingStrategy(strategy);
     }
+
+    @Override
+    public boolean isSupportsConcurrentExecutionAttempts() {
+        return false;
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java
index 3c2749f4ef4..31b689a77fd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java
@@ -33,6 +33,8 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 @Internal
 public abstract class PhysicalTransformation<T> extends Transformation<T> {
 
+    private boolean supportsConcurrentExecutionAttempts = true;
+
     /**
      * Creates a new {@code Transformation} with the given name, output type and parallelism.
      *
@@ -47,4 +49,13 @@ public abstract class PhysicalTransformation<T> extends Transformation<T> {
 
     /** Sets the chaining strategy of this {@code Transformation}. */
     public abstract void setChainingStrategy(ChainingStrategy strategy);
+
+    public boolean isSupportsConcurrentExecutionAttempts() {
+        return supportsConcurrentExecutionAttempts;
+    }
+
+    public void setSupportsConcurrentExecutionAttempts(
+            boolean supportsConcurrentExecutionAttempts) {
+        this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java
index 03d7e1ab8ce..6ac7ce2c37a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractOneInputTransformationTranslator.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
 
 import javax.annotation.Nullable;
 
@@ -91,6 +92,13 @@ abstract class AbstractOneInputTransformationTranslator<IN, OUT, OP extends Tran
             streamGraph.addEdge(inputId, transformationId, 0);
         }
 
+        if (transformation instanceof PhysicalTransformation) {
+            streamGraph.setSupportsConcurrentExecutionAttempts(
+                    transformationId,
+                    ((PhysicalTransformation<OUT>) transformation)
+                            .isSupportsConcurrentExecutionAttempts());
+        }
+
         return Collections.singleton(transformationId);
     }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java
index 03684b89529..c62a3cbed8e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/AbstractTwoInputTransformationTranslator.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
 
 import javax.annotation.Nullable;
 
@@ -100,6 +101,13 @@ public abstract class AbstractTwoInputTransformationTranslator<
             streamGraph.addEdge(inputId, transformationId, 2);
         }
 
+        if (transformation instanceof PhysicalTransformation) {
+            streamGraph.setSupportsConcurrentExecutionAttempts(
+                    transformationId,
+                    ((PhysicalTransformation<OUT>) transformation)
+                            .isSupportsConcurrentExecutionAttempts());
+        }
+
         return Collections.singleton(transformationId);
     }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java
index 378a7171566..9a571f469dd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java
@@ -95,6 +95,9 @@ public class LegacySinkTransformationTranslator<IN>
         streamGraph.setParallelism(transformationId, parallelism);
         streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
 
+        streamGraph.setSupportsConcurrentExecutionAttempts(
+                transformationId, transformation.isSupportsConcurrentExecutionAttempts());
+
         for (Integer inputId : context.getStreamNodeIds(input)) {
             streamGraph.addEdge(inputId, transformationId, 0);
         }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
index 8ab5a2a8ee3..c92ba158082 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySourceTransformationTranslator.java
@@ -86,6 +86,9 @@ public class LegacySourceTransformationTranslator<OUT>
         streamGraph.setParallelism(transformationId, parallelism);
         streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
 
+        streamGraph.setSupportsConcurrentExecutionAttempts(
+                transformationId, transformation.isSupportsConcurrentExecutionAttempts());
+
         return Collections.singleton(transformationId);
     }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
index d5af8bd8562..e381b47d822 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
@@ -133,6 +133,9 @@ public class MultiInputTransformationTranslator<OUT>
             }
         }
 
+        streamGraph.setSupportsConcurrentExecutionAttempts(
+                transformationId, transformation.isSupportsConcurrentExecutionAttempts());
+
         return Collections.singleton(transformationId);
     }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
index af251755a77..57ce5baedb0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
@@ -95,6 +95,10 @@ public class SourceTransformationTranslator<OUT, SplitT extends SourceSplit, Enu
 
         streamGraph.setParallelism(transformationId, parallelism);
         streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
+
+        streamGraph.setSupportsConcurrentExecutionAttempts(
+                transformationId, transformation.isSupportsConcurrentExecutionAttempts());
+
         return Collections.singleton(transformationId);
     }
 }