You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/03/14 02:20:12 UTC

[flink] 01/02: [FLINK-26610][datastream] Check whether sink uid is set when expanding sink topology.

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 04a56cb215fa756cfd1868eb8bf69b7fec5b68ae
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Fri Mar 11 19:47:53 2022 +0800

    [FLINK-26610][datastream] Check whether sink uid is set when expanding sink topology.
    
    Currently sink developers may set uid for operators inside the
    customized topology. In this case if the sink uid is not set, there
    will be duplicate operator uids if the sink is added multiple times
    in a job.
---
 .../file/sink/BatchCompactingFileSinkITCase.java   |  6 ++++
 .../file/sink/BatchExecutionFileSinkITCase.java    | 17 +++++++----
 .../sink/StreamingCompactingFileSinkITCase.java    |  6 ++++
 .../sink/StreamingExecutionFileSinkITCase.java     | 15 +++++++---
 .../translators/SinkTransformationTranslator.java  | 35 +++++++++++++++++-----
 5 files changed, 62 insertions(+), 17 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java
index 0ca6820..9e7ef51 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.connector.file.sink.utils.PartSizeAndCheckpointRollingPo
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 
 import org.junit.Rule;
@@ -60,6 +61,11 @@ public class BatchCompactingFileSinkITCase extends BatchExecutionFileSinkITCase
                 .build();
     }
 
+    @Override
+    protected void configureSink(DataStreamSink<Integer> sink) {
+        sink.uid("sink");
+    }
+
     private static FileCompactor createFileCompactor() {
         return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
     }
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java
index 11b31a8..89b4c67 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -69,17 +70,21 @@ public class BatchExecutionFileSinkITCase extends FileSinkITBase {
                         "Source",
                         Boundedness.BOUNDED);
 
-        source.setParallelism(NUM_SOURCES)
-                .rebalance()
-                .map(new BatchExecutionOnceFailingMap(NUM_RECORDS, triggerFailover))
-                .setParallelism(NUM_SINKS)
-                .sinkTo(createFileSink(path))
-                .setParallelism(NUM_SINKS);
+        DataStreamSink<Integer> sink =
+                source.setParallelism(NUM_SOURCES)
+                        .rebalance()
+                        .map(new BatchExecutionOnceFailingMap(NUM_RECORDS, triggerFailover))
+                        .setParallelism(NUM_SINKS)
+                        .sinkTo(createFileSink(path))
+                        .setParallelism(NUM_SINKS);
+        configureSink(sink);
 
         StreamGraph streamGraph = env.getStreamGraph();
         return streamGraph.getJobGraph();
     }
 
+    protected void configureSink(DataStreamSink<Integer> sink) {}
+
     // ------------------------ Blocking mode user functions ----------------------------------
 
     /**
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java
index 8b3748d..7970746 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.connector.file.sink.utils.PartSizeAndCheckpointRollingPo
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 
 import org.junit.Rule;
@@ -60,6 +61,11 @@ public class StreamingCompactingFileSinkITCase extends StreamingExecutionFileSin
                 .build();
     }
 
+    @Override
+    protected void configureSink(DataStreamSink<Integer> sink) {
+        sink.uid("sink");
+    }
+
     private static FileCompactor createFileCompactor() {
         return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
     }
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
index c9d8f9f..8f5adc9 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -89,15 +90,21 @@ public class StreamingExecutionFileSinkITCase extends FileSinkITBase {
             env.setRestartStrategy(RestartStrategies.noRestart());
         }
 
-        env.addSource(new StreamingExecutionTestSource(latchId, NUM_RECORDS, triggerFailover))
-                .setParallelism(NUM_SOURCES)
-                .sinkTo(createFileSink(path))
-                .setParallelism(NUM_SINKS);
+        DataStreamSink<Integer> sink =
+                env.addSource(
+                                new StreamingExecutionTestSource(
+                                        latchId, NUM_RECORDS, triggerFailover))
+                        .setParallelism(NUM_SOURCES)
+                        .sinkTo(createFileSink(path))
+                        .setParallelism(NUM_SINKS);
+        configureSink(sink);
 
         StreamGraph streamGraph = env.getStreamGraph();
         return streamGraph.getJobGraph();
     }
 
+    protected void configureSink(DataStreamSink<Integer> sink) {}
+
     // ------------------------ Streaming mode user functions ----------------------------------
 
     private static class StreamingExecutionTestSource extends RichParallelSourceFunction<Integer>
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index cb14864..1bdcc50 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -51,6 +51,8 @@ import java.util.Optional;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A {@link org.apache.flink.streaming.api.graph.TransformationTranslator} for the {@link
  * org.apache.flink.streaming.api.transformations.SinkTransformation}.
@@ -129,7 +131,9 @@ public class SinkTransformationTranslator<Input, Output>
             if (sink instanceof WithPreWriteTopology) {
                 prewritten =
                         adjustTransformations(
-                                prewritten, ((WithPreWriteTopology<T>) sink)::addPreWriteTopology);
+                                prewritten,
+                                ((WithPreWriteTopology<T>) sink)::addPreWriteTopology,
+                                true);
             }
 
             if (sink instanceof TwoPhaseCommittingSink) {
@@ -141,7 +145,8 @@ public class SinkTransformationTranslator<Input, Output>
                                 input.transform(
                                         WRITER_NAME,
                                         CommittableMessageTypeInfo.noOutput(),
-                                        new SinkWriterOperatorFactory<>(sink)));
+                                        new SinkWriterOperatorFactory<>(sink)),
+                        false);
             }
 
             final List<Transformation<?>> sinkTransformations =
@@ -172,7 +177,8 @@ public class SinkTransformationTranslator<Input, Output>
                                     input.transform(
                                             WRITER_NAME,
                                             typeInformation,
-                                            new SinkWriterOperatorFactory<>(sink)));
+                                            new SinkWriterOperatorFactory<>(sink)),
+                            false);
 
             DataStream<CommittableMessage<CommT>> precommitted = addFailOverRegion(written);
 
@@ -180,7 +186,8 @@ public class SinkTransformationTranslator<Input, Output>
                 precommitted =
                         adjustTransformations(
                                 precommitted,
-                                ((WithPreCommitTopology<T, CommT>) sink)::addPreCommitTopology);
+                                ((WithPreCommitTopology<T, CommT>) sink)::addPreCommitTopology,
+                                true);
             }
 
             DataStream<CommittableMessage<CommT>> committed =
@@ -193,7 +200,8 @@ public class SinkTransformationTranslator<Input, Output>
                                             new CommitterOperatorFactory<>(
                                                     committingSink,
                                                     isBatchMode,
-                                                    isCheckpointingEnabled)));
+                                                    isCheckpointingEnabled)),
+                            false);
 
             if (sink instanceof WithPostCommitTopology) {
                 DataStream<CommittableMessage<CommT>> postcommitted = addFailOverRegion(committed);
@@ -202,7 +210,8 @@ public class SinkTransformationTranslator<Input, Output>
                         pc -> {
                             ((WithPostCommitTopology<T, CommT>) sink).addPostCommitTopology(pc);
                             return null;
-                        });
+                        },
+                        true);
             }
         }
 
@@ -233,7 +242,9 @@ public class SinkTransformationTranslator<Input, Output>
          * customized parallelism value at environment level.
          */
         private <I, R> R adjustTransformations(
-                DataStream<I> inputStream, Function<DataStream<I>, R> action) {
+                DataStream<I> inputStream,
+                Function<DataStream<I>, R> action,
+                boolean isExpandedTopology) {
 
             // Reset the environment parallelism temporarily before adjusting transformations,
             // we can therefore be aware of any customized parallelism of the sub topology
@@ -247,6 +258,16 @@ public class SinkTransformationTranslator<Input, Output>
                     transformations.subList(numTransformsBefore, transformations.size());
 
             for (Transformation<?> subTransformation : expandedTransformations) {
+                String subUid = subTransformation.getUid();
+                if (isExpandedTopology && subUid != null && !subUid.isEmpty()) {
+                    checkState(
+                            transformation.getUid() != null && !transformation.getUid().isEmpty(),
+                            "Sink "
+                                    + transformation.getName()
+                                    + " requires to set a uid since its customized topology"
+                                    + " has set uid for some operators.");
+                }
+
                 concatUid(
                         subTransformation,
                         Transformation::getUid,