You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/28 08:06:11 UTC

[GitHub] [flink] gaoyunhaii commented on a change in pull request #18428: [FLINK-25575] Add Sink V2 operators and translation

gaoyunhaii commented on a change in pull request #18428:
URL: https://github.com/apache/flink/pull/18428#discussion_r790400529



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
##########
@@ -1240,11 +1238,8 @@ public ExecutionConfig getExecutionConfig() {
             ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
         }
 
-        StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
+        DataStreamSink<T> sink = DataStreamSink.forSinkFunction(this, clean(sinkFunction));

Review comment:
       Simplified to `return DataStreamSink.forSinkFunction(this, clean(sinkFunction));` ?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
##########
@@ -19,49 +19,46 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.flink.streaming.api.transformations;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.operators.ResourceSpec;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A {@link Transformation} for {@link Sink}.
  *
  * @param <InputT> The input type of the {@link SinkWriter}
- * @param <CommT> The committable type of the {@link SinkWriter}
- * @param <WriterStateT> The state type of the {@link SinkWriter}
- * @param <GlobalCommT> The global committable type of the {@link
- *     org.apache.flink.api.connector.sink.GlobalCommitter}
+ * @param <OutputT> The output type of the {@link Sink}
  */
 @Internal
-public class SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT>
-        extends PhysicalTransformation<Object> {
+public class SinkTransformation<InputT, OutputT> extends PhysicalTransformation<OutputT> {
 
+    private final DataStream<InputT> stream;

Review comment:
       Might rename to `inputStream` and remove the `input` field? The input transformation could be acquired with `inputStream.getTransformation()`. 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
##########
@@ -1341,7 +1351,7 @@ public ExecutionConfig getExecutionConfig() {
                         serializer,
                         accumulatorName,
                         env.getCheckpointConfig());
-        CollectStreamSink<T> sink = new CollectStreamSink<>(this, factory);
+        DataStreamSink<T> sink = new CollectStreamSink<>(this, factory);

Review comment:
       Is this change necessary~?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
##########
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0

Review comment:
       This should not be changed ?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
##########
@@ -19,49 +19,46 @@ Licensed to the Apache Software Foundation (ASF) under one
 package org.apache.flink.streaming.api.transformations;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.operators.ResourceSpec;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A {@link Transformation} for {@link Sink}.
  *
  * @param <InputT> The input type of the {@link SinkWriter}
- * @param <CommT> The committable type of the {@link SinkWriter}
- * @param <WriterStateT> The state type of the {@link SinkWriter}
- * @param <GlobalCommT> The global committable type of the {@link
- *     org.apache.flink.api.connector.sink.GlobalCommitter}
+ * @param <OutputT> The output type of the {@link Sink}
  */
 @Internal
-public class SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT>
-        extends PhysicalTransformation<Object> {
+public class SinkTransformation<InputT, OutputT> extends PhysicalTransformation<OutputT> {
 
+    private final DataStream<InputT> stream;
+    private final Sink<InputT> sink;
     private final Transformation<InputT> input;
 
-    private final Sink<InputT, CommT, WriterStateT, GlobalCommT> sink;
-
     private ChainingStrategy chainingStrategy;
 
     public SinkTransformation(
+            DataStream<InputT> stream,
+            Sink<InputT> sink,
             Transformation<InputT> input,
-            Sink<InputT, CommT, WriterStateT, GlobalCommT> sink,
+            TypeInformation<OutputT> outputType,
             String name,
             int parallelism) {
-        super(name, TypeExtractor.getForClass(Object.class), parallelism);
+        super(name, outputType, parallelism);
+        this.stream = stream;

Review comment:
       Also add `checkNotNull` for `stream` and `sink` ?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
##########
@@ -19,250 +19,237 @@
 package org.apache.flink.streaming.runtime.translators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.TransformationTranslator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
-import org.apache.flink.streaming.runtime.operators.sink.SinkOperatorFactory;
-import org.apache.flink.streaming.util.graph.StreamGraphUtils;
-import org.apache.flink.util.FlinkRuntimeException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.streaming.runtime.operators.sink.WriterOperatorFactory;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/** A {@link TransformationTranslator} for the {@link SinkTransformation}. */
+import java.util.List;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * A {@link org.apache.flink.streaming.api.graph.TransformationTranslator} for the {@link
+ * org.apache.flink.streaming.api.transformations.SinkTransformation}.
+ */
 @Internal
-public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCommT>
-        implements TransformationTranslator<
-                Object, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT>> {
+public class SinkTransformationTranslator<Input, Output>
+        implements TransformationTranslator<Output, SinkTransformation<Input, Output>> {
 
-    protected static final Logger LOG = LoggerFactory.getLogger(SinkTransformationTranslator.class);
+    private static final String COMMITTER_NAME = "Committer";
+    private static final String WRITER_NAME = "Writer";
 
     @Override
     public Collection<Integer> translateForBatch(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> transformation,
-            Context context) {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), transformation);
-        final int parallelism = getParallelism(transformation, context);
-
-        try {
-            internalTranslate(transformation, parallelism, true, context);
-        } catch (IOException e) {
-            throw new FlinkRuntimeException(
-                    "Could not add the Committer or GlobalCommitter to the stream graph.", e);
-        }
-        return Collections.emptyList();
+            SinkTransformation<Input, Output> transformation, Context context) {
+        return translateForStreaming(transformation, context);
     }
 
     @Override
     public Collection<Integer> translateForStreaming(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> transformation,
-            Context context) {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), transformation);
-
-        final int parallelism = getParallelism(transformation, context);
-
-        try {
-            internalTranslate(transformation, parallelism, false, context);
-        } catch (IOException e) {
-            throw new FlinkRuntimeException(
-                    "Could not add the Committer or GlobalCommitter to the stream graph.", e);
-        }
+            SinkTransformation<Input, Output> transformation, Context context) {
 
+        SinkExpander<Input> expander =
+                new SinkExpander<>(
+                        transformation.getStream(),
+                        transformation.getSink(),
+                        transformation,
+                        context);
+        expander.expand();
         return Collections.emptyList();
     }
 
     /**
-     * Add the sink operators to the stream graph.
-     *
-     * @param sinkTransformation The sink transformation that committer and global committer belongs
-     *     to.
-     * @param writerParallelism The parallelism of the writer operator.
-     * @param batch Specifies if this sink is executed in batch mode.
+     * Expands the FLIP-143 Sink to a subtopology. Each part of the topology is created after the
+     * previous part of the topology has been completely configured by the user. For example, if a
+     * user explicitly sets the parallelism of the sink, each part of the subtopology can rely on
+     * the input having that parallelism.
      */
-    private void internalTranslate(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            int writerParallelism,
-            boolean batch,
-            Context context)
-            throws IOException {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), sinkTransformation);
-
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
-        boolean needsCommitterOperator =
-                batch && sink.getCommittableSerializer().isPresent()
-                        || sink.getGlobalCommittableSerializer().isPresent();
-        final int writerId =
-                addWriterAndCommitter(
-                        sinkTransformation,
-                        writerParallelism,
-                        batch,
-                        needsCommitterOperator,
-                        context);
-
-        if (needsCommitterOperator) {
-            addGlobalCommitter(writerId, sinkTransformation, batch, context);
+    private static class SinkExpander<T> {
+        private final SinkTransformation<T, ?> transformation;
+        private final Sink<T> sink;
+        private final Context context;
+        private final DataStream<T> inputStream;
+        private final StreamExecutionEnvironment executionEnvironment;
+        private boolean expanded;
+
+        public SinkExpander(
+                DataStream<T> inputStream,
+                Sink<T> sink,
+                SinkTransformation<T, ?> transformation,
+                Context context) {
+            this.inputStream = inputStream;
+            this.executionEnvironment = inputStream.getExecutionEnvironment();
+            this.transformation = transformation;
+            this.sink = sink;
+            this.context = context;
         }
-    }
-
-    /**
-     * Add a sink writer node to the stream graph.
-     *
-     * @param sinkTransformation The transformation that the writer belongs to
-     * @param parallelism The parallelism of the writer
-     * @param batch Specifies if this sink is executed in batch mode.
-     * @param shouldEmit Specifies whether the write should emit committables.
-     * @return The stream node id of the writer
-     */
-    private int addWriterAndCommitter(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            int parallelism,
-            boolean batch,
-            boolean shouldEmit,
-            Context context) {
 
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
-        checkState(sinkTransformation.getInputs().size() == 1);
-        @SuppressWarnings("unchecked")
-        final Transformation<InputT> input =
-                (Transformation<InputT>) sinkTransformation.getInputs().get(0);
-        final TypeInformation<InputT> inputTypeInfo = input.getOutputType();
-
-        final StreamOperatorFactory<byte[]> factory =
-                new SinkOperatorFactory<>(sink, batch, shouldEmit);
-
-        final ChainingStrategy chainingStrategy = sinkTransformation.getChainingStrategy();
-
-        if (chainingStrategy != null) {
-            factory.setChainingStrategy(chainingStrategy);
+        private void expand() {
+            final int sizeBefore = executionEnvironment.getTransformations().size();

Review comment:
       Move this sentence after the check?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/PartitionTransformationTranslator.java
##########
@@ -70,13 +73,15 @@
 
         List<Integer> resultIds = new ArrayList<>();
 
+        StreamExchangeMode exchangeMode = transformation.getExchangeMode();
+        if (!supportsBatchExchange && exchangeMode == StreamExchangeMode.BATCH) {
+            exchangeMode = StreamExchangeMode.UNDEFINED;

Review comment:
       Is this change due to `addFailoverRegion` when translating the sink? I'm a bit tend we not do the check afterwards, instead, it seems better to decide whether to add the failover region according to the runtime mode

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
##########
@@ -19,250 +19,237 @@
 package org.apache.flink.streaming.runtime.translators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.TransformationTranslator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
-import org.apache.flink.streaming.runtime.operators.sink.SinkOperatorFactory;
-import org.apache.flink.streaming.util.graph.StreamGraphUtils;
-import org.apache.flink.util.FlinkRuntimeException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.streaming.runtime.operators.sink.WriterOperatorFactory;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/** A {@link TransformationTranslator} for the {@link SinkTransformation}. */
+import java.util.List;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * A {@link org.apache.flink.streaming.api.graph.TransformationTranslator} for the {@link
+ * org.apache.flink.streaming.api.transformations.SinkTransformation}.
+ */
 @Internal
-public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCommT>
-        implements TransformationTranslator<
-                Object, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT>> {
+public class SinkTransformationTranslator<Input, Output>
+        implements TransformationTranslator<Output, SinkTransformation<Input, Output>> {
 
-    protected static final Logger LOG = LoggerFactory.getLogger(SinkTransformationTranslator.class);
+    private static final String COMMITTER_NAME = "Committer";
+    private static final String WRITER_NAME = "Writer";
 
     @Override
     public Collection<Integer> translateForBatch(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> transformation,
-            Context context) {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), transformation);
-        final int parallelism = getParallelism(transformation, context);
-
-        try {
-            internalTranslate(transformation, parallelism, true, context);
-        } catch (IOException e) {
-            throw new FlinkRuntimeException(
-                    "Could not add the Committer or GlobalCommitter to the stream graph.", e);
-        }
-        return Collections.emptyList();
+            SinkTransformation<Input, Output> transformation, Context context) {
+        return translateForStreaming(transformation, context);
     }
 
     @Override
     public Collection<Integer> translateForStreaming(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> transformation,
-            Context context) {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), transformation);
-
-        final int parallelism = getParallelism(transformation, context);
-
-        try {
-            internalTranslate(transformation, parallelism, false, context);
-        } catch (IOException e) {
-            throw new FlinkRuntimeException(
-                    "Could not add the Committer or GlobalCommitter to the stream graph.", e);
-        }
+            SinkTransformation<Input, Output> transformation, Context context) {
 
+        SinkExpander<Input> expander =
+                new SinkExpander<>(
+                        transformation.getStream(),
+                        transformation.getSink(),
+                        transformation,
+                        context);
+        expander.expand();
         return Collections.emptyList();
     }
 
     /**
-     * Add the sink operators to the stream graph.
-     *
-     * @param sinkTransformation The sink transformation that committer and global committer belongs
-     *     to.
-     * @param writerParallelism The parallelism of the writer operator.
-     * @param batch Specifies if this sink is executed in batch mode.
+     * Expands the FLIP-143 Sink to a subtopology. Each part of the topology is created after the
+     * previous part of the topology has been completely configured by the user. For example, if a
+     * user explicitly sets the parallelism of the sink, each part of the subtopology can rely on
+     * the input having that parallelism.
      */
-    private void internalTranslate(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            int writerParallelism,
-            boolean batch,
-            Context context)
-            throws IOException {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), sinkTransformation);
-
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
-        boolean needsCommitterOperator =
-                batch && sink.getCommittableSerializer().isPresent()
-                        || sink.getGlobalCommittableSerializer().isPresent();
-        final int writerId =
-                addWriterAndCommitter(
-                        sinkTransformation,
-                        writerParallelism,
-                        batch,
-                        needsCommitterOperator,
-                        context);
-
-        if (needsCommitterOperator) {
-            addGlobalCommitter(writerId, sinkTransformation, batch, context);
+    private static class SinkExpander<T> {
+        private final SinkTransformation<T, ?> transformation;
+        private final Sink<T> sink;
+        private final Context context;
+        private final DataStream<T> inputStream;
+        private final StreamExecutionEnvironment executionEnvironment;
+        private boolean expanded;
+
+        public SinkExpander(
+                DataStream<T> inputStream,
+                Sink<T> sink,
+                SinkTransformation<T, ?> transformation,
+                Context context) {
+            this.inputStream = inputStream;
+            this.executionEnvironment = inputStream.getExecutionEnvironment();
+            this.transformation = transformation;
+            this.sink = sink;
+            this.context = context;
         }
-    }
-
-    /**
-     * Add a sink writer node to the stream graph.
-     *
-     * @param sinkTransformation The transformation that the writer belongs to
-     * @param parallelism The parallelism of the writer
-     * @param batch Specifies if this sink is executed in batch mode.
-     * @param shouldEmit Specifies whether the write should emit committables.
-     * @return The stream node id of the writer
-     */
-    private int addWriterAndCommitter(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            int parallelism,
-            boolean batch,
-            boolean shouldEmit,
-            Context context) {
 
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
-        checkState(sinkTransformation.getInputs().size() == 1);
-        @SuppressWarnings("unchecked")
-        final Transformation<InputT> input =
-                (Transformation<InputT>) sinkTransformation.getInputs().get(0);
-        final TypeInformation<InputT> inputTypeInfo = input.getOutputType();
-
-        final StreamOperatorFactory<byte[]> factory =
-                new SinkOperatorFactory<>(sink, batch, shouldEmit);
-
-        final ChainingStrategy chainingStrategy = sinkTransformation.getChainingStrategy();
-
-        if (chainingStrategy != null) {
-            factory.setChainingStrategy(chainingStrategy);
+        private void expand() {
+            final int sizeBefore = executionEnvironment.getTransformations().size();
+            if (expanded) {
+                // may be called twice for multi-staged application, make sure to expand only once
+                return;
+            }
+            expanded = true;
+
+            DataStream<T> prewritten = inputStream;
+            if (sink instanceof WithPreWriteTopology) {
+                prewritten =
+                        adjustTransformations(
+                                prewritten, ((WithPreWriteTopology<T>) sink)::addPreWriteTopology);
+            }
+
+            if (sink instanceof TwoPhaseCommittingSink) {
+                addCommittingTopology(sink, prewritten);
+            } else {
+                adjustTransformations(
+                        prewritten,
+                        input ->
+                                input.transform(
+                                        WRITER_NAME,
+                                        CommittableMessageTypeInfo.noOutput(),
+                                        new WriterOperatorFactory<>(sink)));
+            }
+            final List<Transformation<?>> sinkTransformations =
+                    executionEnvironment
+                            .getTransformations()
+                            .subList(sizeBefore, executionEnvironment.getTransformations().size());
+            sinkTransformations.forEach(context::transform);
         }
 
-        final String format = batch && shouldEmit ? "Sink %s Writer" : "Sink %s";
-
-        return addOperatorToStreamGraph(
-                factory,
-                context.getStreamNodeIds(input),
-                inputTypeInfo,
-                TypeInformation.of(byte[].class),
-                String.format(format, sinkTransformation.getName()),
-                sinkTransformation.getUid(),
-                parallelism,
-                sinkTransformation.getMaxParallelism(),
-                sinkTransformation,
-                context);
-    }
-
-    /**
-     * Try to add a sink global committer to the stream graph.
-     *
-     * @param inputId The global committer's input stream node id.
-     * @param sinkTransformation The transformation that the global committer belongs to.
-     * @param batch Specifies if this sink is executed in batch mode.
-     */
-    private void addGlobalCommitter(
-            int inputId,
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            boolean batch,
-            Context context) {
-
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
-
-        final String format = batch ? "Sink %s Committer" : "Sink %s Global Committer";
-
-        addOperatorToStreamGraph(
-                new CommitterOperatorFactory<>(sink, batch),
-                Collections.singletonList(inputId),
-                TypeInformation.of(byte[].class),
-                null,
-                String.format(format, sinkTransformation.getName()),
-                sinkTransformation.getUid() == null
-                        ? null
-                        : String.format(format, sinkTransformation.getUid()),
-                1,
-                1,
-                sinkTransformation,
-                context);
-    }
-
-    private int getParallelism(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            Context context) {
-        return sinkTransformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
-                ? sinkTransformation.getParallelism()
-                : context.getStreamGraph().getExecutionConfig().getParallelism();
-    }
-
-    /**
-     * Add a operator to the {@link StreamGraph}.
-     *
-     * @param operatorFactory The operator factory
-     * @param inputs A collection of upstream stream node ids.
-     * @param inTypeInfo The input type information of the operator
-     * @param outTypInfo The output type information of the operator
-     * @param name The name of the operator.
-     * @param uid The uid of the operator.
-     * @param parallelism The parallelism of the operator
-     * @param maxParallelism The max parallelism of the operator
-     * @param sinkTransformation The sink transformation which the operator belongs to
-     * @return The stream node id of the operator
-     */
-    private <IN, OUT> int addOperatorToStreamGraph(
-            StreamOperatorFactory<OUT> operatorFactory,
-            Collection<Integer> inputs,
-            TypeInformation<IN> inTypeInfo,
-            TypeInformation<OUT> outTypInfo,
-            String name,
-            @Nullable String uid,
-            int parallelism,
-            int maxParallelism,
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            Context context) {
-        final StreamGraph streamGraph = context.getStreamGraph();
-        final String slotSharingGroup = context.getSlotSharingGroup();
-        final int transformationId = Transformation.getNewNodeId();
-
-        streamGraph.addOperator(
-                transformationId,
-                slotSharingGroup,
-                sinkTransformation.getCoLocationGroupKey(),
-                operatorFactory,
-                inTypeInfo,
-                outTypInfo,
-                name);
+        private <T, CommT> void addCommittingTopology(Sink<T> sink, DataStream<T> inputStream) {
+            TwoPhaseCommittingSink<T, CommT> committingSink =
+                    (TwoPhaseCommittingSink<T, CommT>) sink;
+            TypeInformation<CommittableMessage<CommT>> typeInformation =
+                    CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer);
+
+            DataStream<CommittableMessage<CommT>> written =
+                    adjustTransformations(
+                            inputStream,
+                            input ->
+                                    input.transform(
+                                            WRITER_NAME,
+                                            typeInformation,
+                                            new WriterOperatorFactory<>(sink)));
+
+            DataStream<CommittableMessage<CommT>> precommitted = addFailOverRegion(written);
+
+            if (sink instanceof WithPreCommitTopology) {
+                precommitted =
+                        adjustTransformations(
+                                precommitted,
+                                ((WithPreCommitTopology<T, CommT>) sink)::addPreCommitTopology);
+            }
+            DataStream<CommittableMessage<CommT>> committed =
+                    adjustTransformations(
+                            precommitted,
+                            pc ->
+                                    pc.transform(
+                                            COMMITTER_NAME,
+                                            typeInformation,
+                                            new CommitterOperatorFactory<>(committingSink)));
+            if (sink instanceof WithPostCommitTopology) {
+                DataStream<CommittableMessage<CommT>> postcommitted = addFailOverRegion(committed);
+                adjustTransformations(
+                        postcommitted,
+                        pc -> {
+                            ((WithPostCommitTopology<T, CommT>) sink).addPostCommitTopology(pc);
+                            return null;
+                        });
+            }
+        }
 
-        streamGraph.setParallelism(transformationId, parallelism);
-        streamGraph.setMaxParallelism(transformationId, maxParallelism);
+        /**
+         * Adds a batch exchange that materializes the output first. This is a no-op in STREAMING.
+         */
+        private <T> DataStream<T> addFailOverRegion(DataStream<T> input) {
+            return new DataStream<>(
+                    executionEnvironment,
+                    new PartitionTransformation<>(
+                            input.getTransformation(),
+                            new ForwardPartitioner<>(),
+                            StreamExchangeMode.BATCH));
+        }
 
-        StreamGraphUtils.configureBufferTimeout(
-                streamGraph,
-                transformationId,
-                sinkTransformation,
-                context.getDefaultBufferTimeout());
-        if (uid != null) {
-            streamGraph.setTransformationUID(transformationId, uid);
+        private <T, R> R adjustTransformations(
+                DataStream<T> inputStream, Function<DataStream<T>, R> action) {
+            int numTransformsBefore = executionEnvironment.getTransformations().size();
+            R result = action.apply(inputStream);
+            List<Transformation<?>> transformations = executionEnvironment.getTransformations();
+            List<Transformation<?>> expandedTransformations =
+                    transformations.subList(numTransformsBefore, transformations.size());
+            for (Transformation<?> subTransformation : expandedTransformations) {
+                concatUid(
+                        subTransformation,
+                        Transformation::getUid,
+                        Transformation::setUid,
+                        subTransformation.getName());
+                concatProperty(subTransformation, Transformation::getName, Transformation::setName);
+                concatProperty(
+                        subTransformation,
+                        Transformation::getDescription,
+                        Transformation::setDescription);
+
+                Optional<SlotSharingGroup> ssg = transformation.getSlotSharingGroup();
+                if (ssg.isPresent() && !subTransformation.getSlotSharingGroup().isPresent()) {
+                    subTransformation.setSlotSharingGroup(ssg.get());
+                }
+                subTransformation.setParallelism(transformation.getParallelism());

Review comment:
       Also do we consider other properties like `maxParallelism` and `coLocationGroupKey` ?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
##########
@@ -19,250 +19,237 @@
 package org.apache.flink.streaming.runtime.translators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.TransformationTranslator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
-import org.apache.flink.streaming.runtime.operators.sink.SinkOperatorFactory;
-import org.apache.flink.streaming.util.graph.StreamGraphUtils;
-import org.apache.flink.util.FlinkRuntimeException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.streaming.runtime.operators.sink.WriterOperatorFactory;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/** A {@link TransformationTranslator} for the {@link SinkTransformation}. */
+import java.util.List;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * A {@link org.apache.flink.streaming.api.graph.TransformationTranslator} for the {@link
+ * org.apache.flink.streaming.api.transformations.SinkTransformation}.
+ */
 @Internal
-public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCommT>
-        implements TransformationTranslator<
-                Object, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT>> {
+public class SinkTransformationTranslator<Input, Output>
+        implements TransformationTranslator<Output, SinkTransformation<Input, Output>> {
 
-    protected static final Logger LOG = LoggerFactory.getLogger(SinkTransformationTranslator.class);
+    private static final String COMMITTER_NAME = "Committer";
+    private static final String WRITER_NAME = "Writer";
 
     @Override
     public Collection<Integer> translateForBatch(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> transformation,
-            Context context) {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), transformation);
-        final int parallelism = getParallelism(transformation, context);
-
-        try {
-            internalTranslate(transformation, parallelism, true, context);
-        } catch (IOException e) {
-            throw new FlinkRuntimeException(
-                    "Could not add the Committer or GlobalCommitter to the stream graph.", e);
-        }
-        return Collections.emptyList();
+            SinkTransformation<Input, Output> transformation, Context context) {
+        return translateForStreaming(transformation, context);
     }
 
     @Override
     public Collection<Integer> translateForStreaming(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> transformation,
-            Context context) {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), transformation);
-
-        final int parallelism = getParallelism(transformation, context);
-
-        try {
-            internalTranslate(transformation, parallelism, false, context);
-        } catch (IOException e) {
-            throw new FlinkRuntimeException(
-                    "Could not add the Committer or GlobalCommitter to the stream graph.", e);
-        }
+            SinkTransformation<Input, Output> transformation, Context context) {
 
+        SinkExpander<Input> expander =
+                new SinkExpander<>(
+                        transformation.getStream(),
+                        transformation.getSink(),
+                        transformation,
+                        context);
+        expander.expand();
         return Collections.emptyList();
     }
 
     /**
-     * Add the sink operators to the stream graph.
-     *
-     * @param sinkTransformation The sink transformation that committer and global committer belongs
-     *     to.
-     * @param writerParallelism The parallelism of the writer operator.
-     * @param batch Specifies if this sink is executed in batch mode.
+     * Expands the FLIP-143 Sink to a subtopology. Each part of the topology is created after the
+     * previous part of the topology has been completely configured by the user. For example, if a
+     * user explicitly sets the parallelism of the sink, each part of the subtopology can rely on
+     * the input having that parallelism.
      */
-    private void internalTranslate(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            int writerParallelism,
-            boolean batch,
-            Context context)
-            throws IOException {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), sinkTransformation);
-
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
-        boolean needsCommitterOperator =
-                batch && sink.getCommittableSerializer().isPresent()
-                        || sink.getGlobalCommittableSerializer().isPresent();
-        final int writerId =
-                addWriterAndCommitter(
-                        sinkTransformation,
-                        writerParallelism,
-                        batch,
-                        needsCommitterOperator,
-                        context);
-
-        if (needsCommitterOperator) {
-            addGlobalCommitter(writerId, sinkTransformation, batch, context);
+    private static class SinkExpander<T> {
+        private final SinkTransformation<T, ?> transformation;
+        private final Sink<T> sink;
+        private final Context context;
+        private final DataStream<T> inputStream;
+        private final StreamExecutionEnvironment executionEnvironment;
+        private boolean expanded;
+
+        public SinkExpander(
+                DataStream<T> inputStream,
+                Sink<T> sink,
+                SinkTransformation<T, ?> transformation,
+                Context context) {
+            this.inputStream = inputStream;
+            this.executionEnvironment = inputStream.getExecutionEnvironment();
+            this.transformation = transformation;
+            this.sink = sink;
+            this.context = context;
         }
-    }
-
-    /**
-     * Add a sink writer node to the stream graph.
-     *
-     * @param sinkTransformation The transformation that the writer belongs to
-     * @param parallelism The parallelism of the writer
-     * @param batch Specifies if this sink is executed in batch mode.
-     * @param shouldEmit Specifies whether the write should emit committables.
-     * @return The stream node id of the writer
-     */
-    private int addWriterAndCommitter(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            int parallelism,
-            boolean batch,
-            boolean shouldEmit,
-            Context context) {
 
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
-        checkState(sinkTransformation.getInputs().size() == 1);
-        @SuppressWarnings("unchecked")
-        final Transformation<InputT> input =
-                (Transformation<InputT>) sinkTransformation.getInputs().get(0);
-        final TypeInformation<InputT> inputTypeInfo = input.getOutputType();
-
-        final StreamOperatorFactory<byte[]> factory =
-                new SinkOperatorFactory<>(sink, batch, shouldEmit);
-
-        final ChainingStrategy chainingStrategy = sinkTransformation.getChainingStrategy();
-
-        if (chainingStrategy != null) {
-            factory.setChainingStrategy(chainingStrategy);
+        private void expand() {
+            final int sizeBefore = executionEnvironment.getTransformations().size();
+            if (expanded) {
+                // may be called twice for multi-staged application, make sure to expand only once
+                return;
+            }
+            expanded = true;
+
+            DataStream<T> prewritten = inputStream;
+            if (sink instanceof WithPreWriteTopology) {
+                prewritten =
+                        adjustTransformations(
+                                prewritten, ((WithPreWriteTopology<T>) sink)::addPreWriteTopology);
+            }
+
+            if (sink instanceof TwoPhaseCommittingSink) {
+                addCommittingTopology(sink, prewritten);
+            } else {
+                adjustTransformations(
+                        prewritten,
+                        input ->
+                                input.transform(
+                                        WRITER_NAME,
+                                        CommittableMessageTypeInfo.noOutput(),
+                                        new WriterOperatorFactory<>(sink)));
+            }
+            final List<Transformation<?>> sinkTransformations =
+                    executionEnvironment
+                            .getTransformations()
+                            .subList(sizeBefore, executionEnvironment.getTransformations().size());
+            sinkTransformations.forEach(context::transform);
         }
 
-        final String format = batch && shouldEmit ? "Sink %s Writer" : "Sink %s";
-
-        return addOperatorToStreamGraph(
-                factory,
-                context.getStreamNodeIds(input),
-                inputTypeInfo,
-                TypeInformation.of(byte[].class),
-                String.format(format, sinkTransformation.getName()),
-                sinkTransformation.getUid(),
-                parallelism,
-                sinkTransformation.getMaxParallelism(),
-                sinkTransformation,
-                context);
-    }
-
-    /**
-     * Try to add a sink global committer to the stream graph.
-     *
-     * @param inputId The global committer's input stream node id.
-     * @param sinkTransformation The transformation that the global committer belongs to.
-     * @param batch Specifies if this sink is executed in batch mode.
-     */
-    private void addGlobalCommitter(
-            int inputId,
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            boolean batch,
-            Context context) {
-
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
-
-        final String format = batch ? "Sink %s Committer" : "Sink %s Global Committer";
-
-        addOperatorToStreamGraph(
-                new CommitterOperatorFactory<>(sink, batch),
-                Collections.singletonList(inputId),
-                TypeInformation.of(byte[].class),
-                null,
-                String.format(format, sinkTransformation.getName()),
-                sinkTransformation.getUid() == null
-                        ? null
-                        : String.format(format, sinkTransformation.getUid()),
-                1,
-                1,
-                sinkTransformation,
-                context);
-    }
-
-    private int getParallelism(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            Context context) {
-        return sinkTransformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
-                ? sinkTransformation.getParallelism()
-                : context.getStreamGraph().getExecutionConfig().getParallelism();
-    }
-
-    /**
-     * Add a operator to the {@link StreamGraph}.
-     *
-     * @param operatorFactory The operator factory
-     * @param inputs A collection of upstream stream node ids.
-     * @param inTypeInfo The input type information of the operator
-     * @param outTypInfo The output type information of the operator
-     * @param name The name of the operator.
-     * @param uid The uid of the operator.
-     * @param parallelism The parallelism of the operator
-     * @param maxParallelism The max parallelism of the operator
-     * @param sinkTransformation The sink transformation which the operator belongs to
-     * @return The stream node id of the operator
-     */
-    private <IN, OUT> int addOperatorToStreamGraph(
-            StreamOperatorFactory<OUT> operatorFactory,
-            Collection<Integer> inputs,
-            TypeInformation<IN> inTypeInfo,
-            TypeInformation<OUT> outTypInfo,
-            String name,
-            @Nullable String uid,
-            int parallelism,
-            int maxParallelism,
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            Context context) {
-        final StreamGraph streamGraph = context.getStreamGraph();
-        final String slotSharingGroup = context.getSlotSharingGroup();
-        final int transformationId = Transformation.getNewNodeId();
-
-        streamGraph.addOperator(
-                transformationId,
-                slotSharingGroup,
-                sinkTransformation.getCoLocationGroupKey(),
-                operatorFactory,
-                inTypeInfo,
-                outTypInfo,
-                name);
+        private <T, CommT> void addCommittingTopology(Sink<T> sink, DataStream<T> inputStream) {
+            TwoPhaseCommittingSink<T, CommT> committingSink =
+                    (TwoPhaseCommittingSink<T, CommT>) sink;
+            TypeInformation<CommittableMessage<CommT>> typeInformation =
+                    CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer);
+
+            DataStream<CommittableMessage<CommT>> written =
+                    adjustTransformations(
+                            inputStream,
+                            input ->
+                                    input.transform(
+                                            WRITER_NAME,
+                                            typeInformation,
+                                            new WriterOperatorFactory<>(sink)));
+
+            DataStream<CommittableMessage<CommT>> precommitted = addFailOverRegion(written);
+
+            if (sink instanceof WithPreCommitTopology) {
+                precommitted =
+                        adjustTransformations(
+                                precommitted,
+                                ((WithPreCommitTopology<T, CommT>) sink)::addPreCommitTopology);
+            }
+            DataStream<CommittableMessage<CommT>> committed =
+                    adjustTransformations(
+                            precommitted,
+                            pc ->
+                                    pc.transform(
+                                            COMMITTER_NAME,
+                                            typeInformation,
+                                            new CommitterOperatorFactory<>(committingSink)));
+            if (sink instanceof WithPostCommitTopology) {
+                DataStream<CommittableMessage<CommT>> postcommitted = addFailOverRegion(committed);
+                adjustTransformations(
+                        postcommitted,
+                        pc -> {
+                            ((WithPostCommitTopology<T, CommT>) sink).addPostCommitTopology(pc);
+                            return null;
+                        });
+            }
+        }
 
-        streamGraph.setParallelism(transformationId, parallelism);
-        streamGraph.setMaxParallelism(transformationId, maxParallelism);
+        /**
+         * Adds a batch exchange that materializes the output first. This is a no-op in STREAMING.
+         */
+        private <T> DataStream<T> addFailOverRegion(DataStream<T> input) {
+            return new DataStream<>(
+                    executionEnvironment,
+                    new PartitionTransformation<>(
+                            input.getTransformation(),
+                            new ForwardPartitioner<>(),
+                            StreamExchangeMode.BATCH));
+        }
 
-        StreamGraphUtils.configureBufferTimeout(
-                streamGraph,
-                transformationId,
-                sinkTransformation,
-                context.getDefaultBufferTimeout());
-        if (uid != null) {
-            streamGraph.setTransformationUID(transformationId, uid);
+        private <T, R> R adjustTransformations(
+                DataStream<T> inputStream, Function<DataStream<T>, R> action) {
+            int numTransformsBefore = executionEnvironment.getTransformations().size();
+            R result = action.apply(inputStream);
+            List<Transformation<?>> transformations = executionEnvironment.getTransformations();
+            List<Transformation<?>> expandedTransformations =
+                    transformations.subList(numTransformsBefore, transformations.size());
+            for (Transformation<?> subTransformation : expandedTransformations) {
+                concatUid(
+                        subTransformation,
+                        Transformation::getUid,
+                        Transformation::setUid,
+                        subTransformation.getName());
+                concatProperty(subTransformation, Transformation::getName, Transformation::setName);
+                concatProperty(
+                        subTransformation,
+                        Transformation::getDescription,
+                        Transformation::setDescription);
+
+                Optional<SlotSharingGroup> ssg = transformation.getSlotSharingGroup();
+                if (ssg.isPresent() && !subTransformation.getSlotSharingGroup().isPresent()) {
+                    subTransformation.setSlotSharingGroup(ssg.get());
+                }
+                subTransformation.setParallelism(transformation.getParallelism());

Review comment:
       We might not update the parallelism if it is set explicitly by users?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
##########
@@ -19,250 +19,237 @@
 package org.apache.flink.streaming.runtime.translators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.TransformationTranslator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
-import org.apache.flink.streaming.runtime.operators.sink.SinkOperatorFactory;
-import org.apache.flink.streaming.util.graph.StreamGraphUtils;
-import org.apache.flink.util.FlinkRuntimeException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.streaming.runtime.operators.sink.WriterOperatorFactory;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/** A {@link TransformationTranslator} for the {@link SinkTransformation}. */
+import java.util.List;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * A {@link org.apache.flink.streaming.api.graph.TransformationTranslator} for the {@link
+ * org.apache.flink.streaming.api.transformations.SinkTransformation}.
+ */
 @Internal
-public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCommT>
-        implements TransformationTranslator<
-                Object, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT>> {
+public class SinkTransformationTranslator<Input, Output>
+        implements TransformationTranslator<Output, SinkTransformation<Input, Output>> {
 
-    protected static final Logger LOG = LoggerFactory.getLogger(SinkTransformationTranslator.class);
+    private static final String COMMITTER_NAME = "Committer";
+    private static final String WRITER_NAME = "Writer";
 
     @Override
     public Collection<Integer> translateForBatch(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> transformation,
-            Context context) {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), transformation);
-        final int parallelism = getParallelism(transformation, context);
-
-        try {
-            internalTranslate(transformation, parallelism, true, context);
-        } catch (IOException e) {
-            throw new FlinkRuntimeException(
-                    "Could not add the Committer or GlobalCommitter to the stream graph.", e);
-        }
-        return Collections.emptyList();
+            SinkTransformation<Input, Output> transformation, Context context) {
+        return translateForStreaming(transformation, context);
     }
 
     @Override
     public Collection<Integer> translateForStreaming(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> transformation,
-            Context context) {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), transformation);
-
-        final int parallelism = getParallelism(transformation, context);
-
-        try {
-            internalTranslate(transformation, parallelism, false, context);
-        } catch (IOException e) {
-            throw new FlinkRuntimeException(
-                    "Could not add the Committer or GlobalCommitter to the stream graph.", e);
-        }
+            SinkTransformation<Input, Output> transformation, Context context) {
 
+        SinkExpander<Input> expander =
+                new SinkExpander<>(
+                        transformation.getStream(),
+                        transformation.getSink(),
+                        transformation,
+                        context);
+        expander.expand();
         return Collections.emptyList();
     }
 
     /**
-     * Add the sink operators to the stream graph.
-     *
-     * @param sinkTransformation The sink transformation that committer and global committer belongs
-     *     to.
-     * @param writerParallelism The parallelism of the writer operator.
-     * @param batch Specifies if this sink is executed in batch mode.
+     * Expands the FLIP-143 Sink to a subtopology. Each part of the topology is created after the
+     * previous part of the topology has been completely configured by the user. For example, if a
+     * user explicitly sets the parallelism of the sink, each part of the subtopology can rely on
+     * the input having that parallelism.
      */
-    private void internalTranslate(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            int writerParallelism,
-            boolean batch,
-            Context context)
-            throws IOException {
-
-        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), sinkTransformation);
-
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
-        boolean needsCommitterOperator =
-                batch && sink.getCommittableSerializer().isPresent()
-                        || sink.getGlobalCommittableSerializer().isPresent();
-        final int writerId =
-                addWriterAndCommitter(
-                        sinkTransformation,
-                        writerParallelism,
-                        batch,
-                        needsCommitterOperator,
-                        context);
-
-        if (needsCommitterOperator) {
-            addGlobalCommitter(writerId, sinkTransformation, batch, context);
+    private static class SinkExpander<T> {
+        private final SinkTransformation<T, ?> transformation;
+        private final Sink<T> sink;
+        private final Context context;
+        private final DataStream<T> inputStream;
+        private final StreamExecutionEnvironment executionEnvironment;
+        private boolean expanded;
+
+        public SinkExpander(
+                DataStream<T> inputStream,
+                Sink<T> sink,
+                SinkTransformation<T, ?> transformation,
+                Context context) {
+            this.inputStream = inputStream;
+            this.executionEnvironment = inputStream.getExecutionEnvironment();
+            this.transformation = transformation;
+            this.sink = sink;
+            this.context = context;
         }
-    }
-
-    /**
-     * Add a sink writer node to the stream graph.
-     *
-     * @param sinkTransformation The transformation that the writer belongs to
-     * @param parallelism The parallelism of the writer
-     * @param batch Specifies if this sink is executed in batch mode.
-     * @param shouldEmit Specifies whether the write should emit committables.
-     * @return The stream node id of the writer
-     */
-    private int addWriterAndCommitter(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            int parallelism,
-            boolean batch,
-            boolean shouldEmit,
-            Context context) {
 
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
-        checkState(sinkTransformation.getInputs().size() == 1);
-        @SuppressWarnings("unchecked")
-        final Transformation<InputT> input =
-                (Transformation<InputT>) sinkTransformation.getInputs().get(0);
-        final TypeInformation<InputT> inputTypeInfo = input.getOutputType();
-
-        final StreamOperatorFactory<byte[]> factory =
-                new SinkOperatorFactory<>(sink, batch, shouldEmit);
-
-        final ChainingStrategy chainingStrategy = sinkTransformation.getChainingStrategy();
-
-        if (chainingStrategy != null) {
-            factory.setChainingStrategy(chainingStrategy);
+        private void expand() {
+            final int sizeBefore = executionEnvironment.getTransformations().size();
+            if (expanded) {
+                // may be called twice for multi-staged application, make sure to expand only once
+                return;
+            }
+            expanded = true;
+
+            DataStream<T> prewritten = inputStream;
+            if (sink instanceof WithPreWriteTopology) {
+                prewritten =
+                        adjustTransformations(
+                                prewritten, ((WithPreWriteTopology<T>) sink)::addPreWriteTopology);
+            }
+
+            if (sink instanceof TwoPhaseCommittingSink) {
+                addCommittingTopology(sink, prewritten);
+            } else {
+                adjustTransformations(
+                        prewritten,
+                        input ->
+                                input.transform(
+                                        WRITER_NAME,
+                                        CommittableMessageTypeInfo.noOutput(),
+                                        new WriterOperatorFactory<>(sink)));
+            }
+            final List<Transformation<?>> sinkTransformations =
+                    executionEnvironment
+                            .getTransformations()
+                            .subList(sizeBefore, executionEnvironment.getTransformations().size());
+            sinkTransformations.forEach(context::transform);
         }
 
-        final String format = batch && shouldEmit ? "Sink %s Writer" : "Sink %s";
-
-        return addOperatorToStreamGraph(
-                factory,
-                context.getStreamNodeIds(input),
-                inputTypeInfo,
-                TypeInformation.of(byte[].class),
-                String.format(format, sinkTransformation.getName()),
-                sinkTransformation.getUid(),
-                parallelism,
-                sinkTransformation.getMaxParallelism(),
-                sinkTransformation,
-                context);
-    }
-
-    /**
-     * Try to add a sink global committer to the stream graph.
-     *
-     * @param inputId The global committer's input stream node id.
-     * @param sinkTransformation The transformation that the global committer belongs to.
-     * @param batch Specifies if this sink is executed in batch mode.
-     */
-    private void addGlobalCommitter(
-            int inputId,
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            boolean batch,
-            Context context) {
-
-        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
-
-        final String format = batch ? "Sink %s Committer" : "Sink %s Global Committer";
-
-        addOperatorToStreamGraph(
-                new CommitterOperatorFactory<>(sink, batch),
-                Collections.singletonList(inputId),
-                TypeInformation.of(byte[].class),
-                null,
-                String.format(format, sinkTransformation.getName()),
-                sinkTransformation.getUid() == null
-                        ? null
-                        : String.format(format, sinkTransformation.getUid()),
-                1,
-                1,
-                sinkTransformation,
-                context);
-    }
-
-    private int getParallelism(
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            Context context) {
-        return sinkTransformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
-                ? sinkTransformation.getParallelism()
-                : context.getStreamGraph().getExecutionConfig().getParallelism();
-    }
-
-    /**
-     * Add a operator to the {@link StreamGraph}.
-     *
-     * @param operatorFactory The operator factory
-     * @param inputs A collection of upstream stream node ids.
-     * @param inTypeInfo The input type information of the operator
-     * @param outTypInfo The output type information of the operator
-     * @param name The name of the operator.
-     * @param uid The uid of the operator.
-     * @param parallelism The parallelism of the operator
-     * @param maxParallelism The max parallelism of the operator
-     * @param sinkTransformation The sink transformation which the operator belongs to
-     * @return The stream node id of the operator
-     */
-    private <IN, OUT> int addOperatorToStreamGraph(
-            StreamOperatorFactory<OUT> operatorFactory,
-            Collection<Integer> inputs,
-            TypeInformation<IN> inTypeInfo,
-            TypeInformation<OUT> outTypInfo,
-            String name,
-            @Nullable String uid,
-            int parallelism,
-            int maxParallelism,
-            SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
-            Context context) {
-        final StreamGraph streamGraph = context.getStreamGraph();
-        final String slotSharingGroup = context.getSlotSharingGroup();
-        final int transformationId = Transformation.getNewNodeId();
-
-        streamGraph.addOperator(
-                transformationId,
-                slotSharingGroup,
-                sinkTransformation.getCoLocationGroupKey(),
-                operatorFactory,
-                inTypeInfo,
-                outTypInfo,
-                name);
+        private <T, CommT> void addCommittingTopology(Sink<T> sink, DataStream<T> inputStream) {

Review comment:
       `T` would not need to be declared since it is already declared in the class level

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
##########
@@ -71,47 +68,23 @@ public void setChainingStrategy(ChainingStrategy strategy) {
 
     @Override
     public List<Transformation<?>> getTransitivePredecessors() {
-        final List<Transformation<?>> result = Lists.newArrayList();
-        result.add(this);
-        result.addAll(input.getTransitivePredecessors());
-        return result;
+        return Lists.newArrayList();

Review comment:
       This implementation seems not right? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org