You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/03/17 08:04:14 UTC

[flink] branch release-1.15 updated (311122c -> 3d06468)

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a change to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 311122c  [hotfix][ci] Try to fix the e2e ci pipeline upgrading the libssl version
     new 31cd737  [FLINK-26613][streaming] Allow setting operator uid hashes for predefined sink operators
     new 3d06468  [FLINK-26613][streaming] Use Flink 1.13 sink committer operator uid pattern

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../datastream/CustomSinkOperatorUidHashes.java    | 130 +++++++++++++++++++++
 .../flink/streaming/api/datastream/DataStream.java |  38 +++++-
 .../streaming/api/datastream/DataStreamSink.java   |  14 ++-
 .../api/transformations/SinkTransformation.java    |  10 +-
 .../translators/SinkTransformationTranslator.java  |  33 +++++-
 .../graph/SinkTransformationTranslatorTest.java    |  53 +++++++++
 .../plan/nodes/exec/common/CommonExecSink.java     |   9 +-
 7 files changed, 275 insertions(+), 12 deletions(-)
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CustomSinkOperatorUidHashes.java

[flink] 02/02: [FLINK-26613][streaming] Use Flink 1.13 sink committer operator uid pattern

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d0646836c149222cf9c2184dd70b83bacc42c16
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Tue Mar 15 13:13:02 2022 +0100

    [FLINK-26613][streaming] Use Flink 1.13 sink committer operator uid pattern
    
    Since there is no dedicated committer operator in Flink 1.14 it is safe
    to use the uid pattern of 1.13 to ease upgrades from Flink 1.13 to 1.15.
---
 .../translators/SinkTransformationTranslator.java  |  7 ++++---
 .../graph/SinkTransformationTranslatorTest.java    | 23 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index 782ccca..0dd6087 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -355,9 +355,10 @@ public class SinkTransformationTranslator<Input, Output>
                 BiConsumer<Transformation<?>, String> setter,
                 @Nullable String transformationName) {
             if (transformationName != null && getter.apply(transformation) != null) {
-                // Use the same uid pattern than for Sink V1
+                // Use the same uid pattern than for Sink V1. We deliberately decided to use the uid
+                // pattern of Flink 1.13 because 1.14 did not have a dedicated committer operator.
                 if (transformationName.equals(COMMITTER_NAME)) {
-                    final String committerFormat = "Sink %s Committer";
+                    final String committerFormat = "Sink Committer: %s";
                     setter.accept(
                             subTransformation,
                             String.format(committerFormat, getter.apply(transformation)));
@@ -369,7 +370,7 @@ public class SinkTransformationTranslator<Input, Output>
                     return;
                 }
 
-                // Use the same uid pattern than for Sink V1
+                // Use the same uid pattern than for Sink V1 in Flink 1.14.
                 if (transformationName.equals(
                         StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME)) {
                     final String committerFormat = "Sink %s Global Committer";
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java
index 164a8c4..e2086ec 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java
@@ -293,6 +293,29 @@ public class SinkTransformationTranslatorTest extends TestLogger {
         assertEquals(findGlobalCommitter(streamGraph).getUserHash(), globalCommitterHash);
     }
 
+    /**
+     * When ever you need to change something in this test case please think about possible state
+     * upgrade problems introduced by your changes.
+     */
+    @Test
+    public void testSettingOperatorUids() {
+        final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead";
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStreamSource<Integer> src = env.fromElements(1, 2);
+        src.sinkTo(TestSink.newBuilder().setDefaultCommitter().setDefaultGlobalCommitter().build())
+                .name(NAME)
+                .uid(sinkUid);
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+        assertEquals(findWriter(streamGraph).getTransformationUID(), sinkUid);
+        assertEquals(
+                findCommitter(streamGraph).getTransformationUID(),
+                String.format("Sink Committer: %s", sinkUid));
+        assertEquals(
+                findGlobalCommitter(streamGraph).getTransformationUID(),
+                String.format("Sink %s Global Committer", sinkUid));
+    }
+
     private void validateTopology(
             StreamNode src,
             Class<?> srcOutTypeInfo,

[flink] 01/02: [FLINK-26613][streaming] Allow setting operator uid hashes for predefined sink operators

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 31cd7376e64cce16808f01eedc70b1e678170781
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Mon Mar 14 10:41:00 2022 +0100

    [FLINK-26613][streaming] Allow setting operator uid hashes for predefined sink operators
    
    Since the topology has changes between Flink 1.14 and 1.15 it might
    happen that stateful upgrades are not possible if no pior operator uids
    were set. With this commit, users can set operator uid hashes for the
    respective operators.
---
 .../datastream/CustomSinkOperatorUidHashes.java    | 130 +++++++++++++++++++++
 .../flink/streaming/api/datastream/DataStream.java |  38 +++++-
 .../streaming/api/datastream/DataStreamSink.java   |  14 ++-
 .../api/transformations/SinkTransformation.java    |  10 +-
 .../translators/SinkTransformationTranslator.java  |  26 +++++
 .../graph/SinkTransformationTranslatorTest.java    |  30 +++++
 .../plan/nodes/exec/common/CommonExecSink.java     |   9 +-
 7 files changed, 248 insertions(+), 9 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CustomSinkOperatorUidHashes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CustomSinkOperatorUidHashes.java
new file mode 100644
index 0000000..ae4a60e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CustomSinkOperatorUidHashes.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+
+import javax.annotation.Nullable;
+
+/**
+ * This class is responsible to hold operator Uid hashes from the common operators of the sink. With
+ * this, users can recover a sink snapshot that did not bind uids to the operator before changing
+ * the topology.
+ */
+@PublicEvolving
+public class CustomSinkOperatorUidHashes {
+
+    /** Default instance providing no custom sink operator hashes. */
+    public static final CustomSinkOperatorUidHashes DEFAULT =
+            CustomSinkOperatorUidHashes.builder().build();
+
+    @Nullable private final String writerUidHash;
+    @Nullable private final String committerUidHash;
+    @Nullable private final String globalCommitterUidHash;
+
+    private CustomSinkOperatorUidHashes(
+            @Nullable String writerUidHash,
+            @Nullable String committerUidHash,
+            @Nullable String globalCommitterUidHash) {
+        this.writerUidHash = writerUidHash;
+        this.committerUidHash = committerUidHash;
+        this.globalCommitterUidHash = globalCommitterUidHash;
+    }
+
+    /**
+     * Creates a builder to construct {@link CustomSinkOperatorUidHashes}.
+     *
+     * @return {@link SinkOperatorUidHashesBuilder}
+     */
+    public static SinkOperatorUidHashesBuilder builder() {
+        return new SinkOperatorUidHashesBuilder();
+    }
+
+    @Internal
+    @Nullable
+    public String getWriterUidHash() {
+        return writerUidHash;
+    }
+
+    @Internal
+    @Nullable
+    public String getCommitterUidHash() {
+        return committerUidHash;
+    }
+
+    @Internal
+    @Nullable
+    public String getGlobalCommitterUidHash() {
+        return globalCommitterUidHash;
+    }
+
+    /** Builder to construct {@link CustomSinkOperatorUidHashes}. */
+    @PublicEvolving
+    public static class SinkOperatorUidHashesBuilder {
+
+        @Nullable String writerUidHash = null;
+        @Nullable String committerUidHash = null;
+        @Nullable String globalCommitterUidHash = null;
+
+        /**
+         * Sets the uid hash of the writer operator used to recover state.
+         *
+         * @param writerUidHash uid hash denoting writer operator
+         * @return {@link SinkOperatorUidHashesBuilder}
+         */
+        public SinkOperatorUidHashesBuilder setWriterUidHash(String writerUidHash) {
+            this.writerUidHash = writerUidHash;
+            return this;
+        }
+
+        /**
+         * Sets the uid hash of the committer operator used to recover state.
+         *
+         * @param committerUidHash uid hash denoting the committer operator
+         * @return {@link SinkOperatorUidHashesBuilder}
+         */
+        public SinkOperatorUidHashesBuilder setCommitterUidHash(String committerUidHash) {
+            this.committerUidHash = committerUidHash;
+            return this;
+        }
+
+        /**
+         * Sets the uid hash of the global committer operator used to recover state.
+         *
+         * @param globalCommitterUidHash uid hash denoting the global committer operator
+         * @return {@link SinkOperatorUidHashesBuilder}
+         */
+        public SinkOperatorUidHashesBuilder setGlobalCommitterUidHash(
+                String globalCommitterUidHash) {
+            this.globalCommitterUidHash = globalCommitterUidHash;
+            return this;
+        }
+
+        /**
+         * Constructs the {@link CustomSinkOperatorUidHashes} with the given uid hashes.
+         *
+         * @return {@link CustomSinkOperatorUidHashes}
+         */
+        public CustomSinkOperatorUidHashes build() {
+            return new CustomSinkOperatorUidHashes(
+                    writerUidHash, committerUidHash, globalCommitterUidHash);
+        }
+    }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 3382b01..b0c3d64 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1250,10 +1250,27 @@ public class DataStream<T> {
      */
     @PublicEvolving
     public DataStreamSink<T> sinkTo(org.apache.flink.api.connector.sink.Sink<T, ?, ?, ?> sink) {
+        return this.sinkTo(sink, CustomSinkOperatorUidHashes.DEFAULT);
+    }
+
+    /**
+     * Adds the given {@link Sink} to this DataStream. Only streams with sinks added will be
+     * executed once the {@link StreamExecutionEnvironment#execute()} method is called.
+     *
+     * <p>This method is intended to be used only to recover a snapshot where no uids have been set
+     * before taking the snapshot.
+     *
+     * @param sink The user defined sink.
+     * @return The closed DataStream.
+     */
+    @PublicEvolving
+    public DataStreamSink<T> sinkTo(
+            org.apache.flink.api.connector.sink.Sink<T, ?, ?, ?> sink,
+            CustomSinkOperatorUidHashes customSinkOperatorUidHashes) {
         // read the output type of the input Transform to coax out errors about MissingTypeInfo
         transformation.getOutputType();
 
-        return DataStreamSink.forSinkV1(this, sink);
+        return DataStreamSink.forSinkV1(this, sink, customSinkOperatorUidHashes);
     }
 
     /**
@@ -1265,10 +1282,27 @@ public class DataStream<T> {
      */
     @PublicEvolving
     public DataStreamSink<T> sinkTo(Sink<T> sink) {
+        return this.sinkTo(sink, CustomSinkOperatorUidHashes.DEFAULT);
+    }
+
+    /**
+     * Adds the given {@link Sink} to this DataStream. Only streams with sinks added will be
+     * executed once the {@link StreamExecutionEnvironment#execute()} method is called.
+     *
+     * <p>This method is intended to be used only to recover a snapshot where no uids have been set
+     * before taking the snapshot.
+     *
+     * @param customSinkOperatorUidHashes operator hashes to support state binding
+     * @param sink The user defined sink.
+     * @return The closed DataStream.
+     */
+    @PublicEvolving
+    public DataStreamSink<T> sinkTo(
+            Sink<T> sink, CustomSinkOperatorUidHashes customSinkOperatorUidHashes) {
         // read the output type of the input Transform to coax out errors about MissingTypeInfo
         transformation.getOutputType();
 
-        return DataStreamSink.forSink(this, sink);
+        return DataStreamSink.forSink(this, sink, customSinkOperatorUidHashes);
     }
 
     /**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index f93c2c0..f9d60e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -65,7 +65,10 @@ public class DataStreamSink<T> {
     }
 
     @Internal
-    public static <T> DataStreamSink<T> forSink(DataStream<T> inputStream, Sink<T> sink) {
+    public static <T> DataStreamSink<T> forSink(
+            DataStream<T> inputStream,
+            Sink<T> sink,
+            CustomSinkOperatorUidHashes customSinkOperatorUidHashes) {
         final StreamExecutionEnvironment executionEnvironment =
                 inputStream.getExecutionEnvironment();
         SinkTransformation<T, T> transformation =
@@ -74,15 +77,18 @@ public class DataStreamSink<T> {
                         sink,
                         inputStream.getType(),
                         "Sink",
-                        executionEnvironment.getParallelism());
+                        executionEnvironment.getParallelism(),
+                        customSinkOperatorUidHashes);
         executionEnvironment.addOperator(transformation);
         return new DataStreamSink<>(transformation);
     }
 
     @Internal
     public static <T> DataStreamSink<T> forSinkV1(
-            DataStream<T> inputStream, org.apache.flink.api.connector.sink.Sink<T, ?, ?, ?> sink) {
-        return forSink(inputStream, SinkV1Adapter.wrap(sink));
+            DataStream<T> inputStream,
+            org.apache.flink.api.connector.sink.Sink<T, ?, ?, ?> sink,
+            CustomSinkOperatorUidHashes customSinkOperatorUidHashes) {
+        return forSink(inputStream, SinkV1Adapter.wrap(sink), customSinkOperatorUidHashes);
     }
 
     /** Returns the transformation that contains the actual sink operator of this sink. */
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
index 967f364..bb11c26 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 
@@ -47,6 +48,7 @@ public class SinkTransformation<InputT, OutputT> extends PhysicalTransformation<
     private final DataStream<InputT> inputStream;
     private final Sink<InputT> sink;
     private final Transformation<InputT> input;
+    private final CustomSinkOperatorUidHashes customSinkOperatorUidHashes;
 
     private ChainingStrategy chainingStrategy;
 
@@ -55,11 +57,13 @@ public class SinkTransformation<InputT, OutputT> extends PhysicalTransformation<
             Sink<InputT> sink,
             TypeInformation<OutputT> outputType,
             String name,
-            int parallelism) {
+            int parallelism,
+            CustomSinkOperatorUidHashes customSinkOperatorUidHashes) {
         super(name, outputType, parallelism);
         this.inputStream = checkNotNull(inputStream);
         this.sink = checkNotNull(sink);
         this.input = inputStream.getTransformation();
+        this.customSinkOperatorUidHashes = checkNotNull(customSinkOperatorUidHashes);
     }
 
     @Override
@@ -92,4 +96,8 @@ public class SinkTransformation<InputT, OutputT> extends PhysicalTransformation<
     public Sink<InputT> getSink() {
         return sink;
     }
+
+    public CustomSinkOperatorUidHashes getSinkOperatorsUidHashes() {
+        return customSinkOperatorUidHashes;
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index 1bdcc50..782ccca 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
 import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
 import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
 import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.TransformationTranslator;
@@ -257,7 +258,10 @@ public class SinkTransformationTranslator<Input, Output>
             List<Transformation<?>> expandedTransformations =
                     transformations.subList(numTransformsBefore, transformations.size());
 
+            final CustomSinkOperatorUidHashes operatorsUidHashes =
+                    transformation.getSinkOperatorsUidHashes();
             for (Transformation<?> subTransformation : expandedTransformations) {
+
                 String subUid = subTransformation.getUid();
                 if (isExpandedTopology && subUid != null && !subUid.isEmpty()) {
                     checkState(
@@ -268,6 +272,18 @@ public class SinkTransformationTranslator<Input, Output>
                                     + " has set uid for some operators.");
                 }
 
+                // Set the operator uid hashes to support stateful upgrades without prior uids
+                setOperatorUidHashIfPossible(
+                        subTransformation, WRITER_NAME, operatorsUidHashes.getWriterUidHash());
+                setOperatorUidHashIfPossible(
+                        subTransformation,
+                        COMMITTER_NAME,
+                        operatorsUidHashes.getCommitterUidHash());
+                setOperatorUidHashIfPossible(
+                        subTransformation,
+                        StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME,
+                        operatorsUidHashes.getGlobalCommitterUidHash());
+
                 concatUid(
                         subTransformation,
                         Transformation::getUid,
@@ -323,6 +339,16 @@ public class SinkTransformationTranslator<Input, Output>
             return result;
         }
 
+        private void setOperatorUidHashIfPossible(
+                Transformation<?> transformation,
+                String writerName,
+                @Nullable String operatorUidHash) {
+            if (operatorUidHash == null || !transformation.getName().equals(writerName)) {
+                return;
+            }
+            transformation.setUidHash(operatorUidHash);
+        }
+
         private void concatUid(
                 Transformation<?> subTransformation,
                 Function<Transformation<?>, String> getter,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java
index 44189e0..164a8c4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -47,6 +48,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
 
 /** Tests for {@link org.apache.flink.streaming.api.transformations.SinkTransformation}. */
 @RunWith(Parameterized.class)
@@ -263,6 +265,34 @@ public class SinkTransformationTranslatorTest extends TestLogger {
                 is(ChainingStrategy.NEVER));
     }
 
+    @Test
+    public void testSettingOperatorUidHash() {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStreamSource<Integer> src = env.fromElements(1, 2);
+        final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
+        final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
+        final String globalCommitterHash = "77e6aa6eeb1643b3765e1e4a7a672f37";
+        final CustomSinkOperatorUidHashes operatorsUidHashes =
+                CustomSinkOperatorUidHashes.builder()
+                        .setWriterUidHash(writerHash)
+                        .setCommitterUidHash(committerHash)
+                        .setGlobalCommitterUidHash(globalCommitterHash)
+                        .build();
+        src.sinkTo(
+                        TestSink.newBuilder()
+                                .setDefaultCommitter()
+                                .setDefaultGlobalCommitter()
+                                .build(),
+                        operatorsUidHashes)
+                .name(NAME);
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+
+        assertEquals(findWriter(streamGraph).getUserHash(), writerHash);
+        assertEquals(findCommitter(streamGraph).getUserHash(), committerHash);
+        assertEquals(findGlobalCommitter(streamGraph).getUserHash(), globalCommitterHash);
+    }
+
     private void validateTopology(
             StreamNode src,
             Class<?> srcOutTypeInfo,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index ce657c9..a5cf738 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -491,7 +492,9 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
             final DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation);
             final Transformation<?> transformation =
                     DataStreamSink.forSinkV1(
-                                    dataStream, ((SinkProvider) runtimeProvider).createSink())
+                                    dataStream,
+                                    ((SinkProvider) runtimeProvider).createSink(),
+                                    CustomSinkOperatorUidHashes.DEFAULT)
                             .getTransformation();
             transformation.setParallelism(sinkParallelism);
             sinkMeta.fill(transformation);
@@ -503,7 +506,9 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
             final DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation);
             final Transformation<?> transformation =
                     DataStreamSink.forSink(
-                                    dataStream, ((SinkV2Provider) runtimeProvider).createSink())
+                                    dataStream,
+                                    ((SinkV2Provider) runtimeProvider).createSink(),
+                                    CustomSinkOperatorUidHashes.DEFAULT)
                             .getTransformation();
             transformation.setParallelism(sinkParallelism);
             sinkMeta.fill(transformation);