You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/02/16 17:26:23 UTC
[flink] 01/06: [hotfix][core] Do not set parallelism without checking whether the parallelism is set when translating Sink.
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6a0d6fa4de610ded1220845365c59a84831bc454
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Feb 9 18:56:41 2022 +0800
[hotfix][core] Do not set parallelism without checking whether the parallelism is set when translating Sink.
---
.../runtime/translators/SinkTransformationTranslator.java | 11 -----------
1 file changed, 11 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index d2efbb8..97c19d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
-import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
@@ -233,16 +232,6 @@ public class SinkTransformationTranslator<Input, Output>
transformations.subList(numTransformsBefore, transformations.size());
for (Transformation<?> subTransformation : expandedTransformations) {
- // Skip overwriting the parallelism for the global committer
- if (subTransformation.getName() == null
- || !subTransformation
- .getName()
- .equals(
- StandardSinkTopologies
- .GLOBAL_COMMITTER_TRANSFORMATION_NAME)) {
- subTransformation.setParallelism(transformation.getParallelism());
- }
-
concatUid(
subTransformation,
Transformation::getUid,