You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/02/16 17:26:23 UTC

[flink] 01/06: [hotfix][core] Do not set parallelism without checking whether the parallelism is set when translating Sink.

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6a0d6fa4de610ded1220845365c59a84831bc454
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Feb 9 18:56:41 2022 +0800

    [hotfix][core] Do not set parallelism without checking whether the parallelism is set when translating Sink.
---
 .../runtime/translators/SinkTransformationTranslator.java     | 11 -----------
 1 file changed, 11 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index d2efbb8..97c19d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
-import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
 import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
 import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
 import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
@@ -233,16 +232,6 @@ public class SinkTransformationTranslator<Input, Output>
                     transformations.subList(numTransformsBefore, transformations.size());
 
             for (Transformation<?> subTransformation : expandedTransformations) {
-                // Skip overwriting the parallelism for the global committer
-                if (subTransformation.getName() == null
-                        || !subTransformation
-                                .getName()
-                                .equals(
-                                        StandardSinkTopologies
-                                                .GLOBAL_COMMITTER_TRANSFORMATION_NAME)) {
-                    subTransformation.setParallelism(transformation.getParallelism());
-                }
-
                 concatUid(
                         subTransformation,
                         Transformation::getUid,