You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/10 05:26:03 UTC

[incubator-seatunnel] branch api-draft updated: [Api-Draft] Add Flink Sink Converter to support SeaTunnel transfer to Flink engine. (#1839)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 0db318bc [Api-Draft] Add Flink Sink Converter to support SeaTunnel transfer to Flink engine.   (#1839)
0db318bc is described below

commit 0db318bc93858d3a7e74f6633e425d8593df91f2
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Tue May 10 13:25:57 2022 +0800

    [Api-Draft] Add Flink Sink Converter to support SeaTunnel transfer to Flink engine.   (#1839)
    
    * Add Flink Sink Converter to support SeaTunnel transfer to Flink engine.
---
 .../DefaultSerializer.java}                        |  17 ++--
 ...ommitter.java => DefaultSinkWriterContext.java} |  30 ++++--
 .../java/org/apache/seatunnel/api/sink/Sink.java   |   5 +-
 .../api/sink/SinkAggregatedCommitter.java          |   7 +-
 .../apache/seatunnel/api/sink/SinkCommitter.java   |   4 +-
 .../org/apache/seatunnel/api/sink/SinkWriter.java  |  24 +++--
 .../serialization/SerializerConverter.java         |  12 +--
 .../sink/SinkAggregatedCommitterConverter.java     |  11 +--
 .../translation/sink/SinkCommitterConverter.java   |  12 +--
 .../seatunnel/translation/sink/SinkConverter.java  |  13 ++-
 .../translation/sink/SinkWriterConverter.java      |  11 +--
 .../seatunnel-translation-flink/pom.xml            |   7 ++
 .../FlinkSimpleVersionedSerializer.java            |  30 ++++--
 .../translation/flink/sink/FlinkCommitter.java     |  23 ++++-
 .../flink/sink/FlinkCommitterConverter.java        |  16 ++--
 .../flink/sink/FlinkGlobalCommitter.java           |  67 +++++++++++++
 .../flink/sink/FlinkGlobalCommitterConverter.java  |  17 ++--
 .../FlinkSimpleVersionedSerializerConverter.java   |  17 ++--
 .../translation/flink/sink/FlinkSink.java          | 104 +++++++++++++++++++++
 .../translation/flink/sink/FlinkSinkConverter.java |  17 ++--
 .../translation/flink/sink/FlinkSinkWriter.java    |  64 +++++++++++++
 .../flink/sink/FlinkSinkWriterConverter.java       |  16 ++--
 22 files changed, 410 insertions(+), 114 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
similarity index 66%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
index 1f38efa3..ec213c69 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
@@ -15,15 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.api.serialization;
 
 import java.io.IOException;
-import java.util.List;
+import java.nio.charset.StandardCharsets;
 
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public class DefaultSerializer implements Serializer<String> {
 
-    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
-            throws IOException, InterruptedException;
+    @Override
+    public byte[] serialize(String obj) throws IOException {
+        return obj.getBytes(StandardCharsets.UTF_8);
+    }
 
-    void abort() throws Exception;
+    @Override
+    public String deserialize(byte[] serialized) throws IOException {
+        return new String(serialized);
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
similarity index 54%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
index 2dff2525..aeba2dd1 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
@@ -17,14 +17,32 @@
 
 package org.apache.seatunnel.api.sink;
 
-import java.io.IOException;
-import java.util.List;
+import java.util.Map;
 
-public interface SinkCommitter<CommitInfoT> {
+public class DefaultSinkWriterContext implements SinkWriter.Context {
 
-    List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
+    private final Map<String, String> configuration;
+    private final int subtask;
+    private final int parallelism;
 
-    List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+    public DefaultSinkWriterContext(Map<String, String> configuration, int subtask, int parallelism) {
+        this.configuration = configuration;
+        this.subtask = subtask;
+        this.parallelism = parallelism;
+    }
 
-    void abort() throws Exception;
+    @Override
+    public Map<String, String> getConfiguration() {
+        return configuration;
+    }
+
+    @Override
+    public int getIndexOfSubtask() {
+        return subtask;
+    }
+
+    @Override
+    public int getNumberOfParallelSubtasks() {
+        return parallelism;
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
index 2c1fedef..cd7ba6ee 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
@@ -26,9 +26,10 @@ import java.util.Optional;
 
 public interface Sink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable {
 
-    SinkWriter<IN, StateT> createWriter(SinkWriter.Context context) throws IOException;
+    SinkWriter<IN, CommitInfoT, StateT> createWriter(SinkWriter.Context context) throws IOException;
 
-    default SinkWriter<IN, StateT> restoreWriter(SinkWriter.Context context, List<StateT> states) throws IOException {
+    default SinkWriter<IN, CommitInfoT, StateT> restoreWriter(SinkWriter.Context context,
+                                                              List<StateT> states) throws IOException {
         return createWriter(context);
     }
 
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index 1f38efa3..fcca1c5e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -22,8 +22,9 @@ import java.util.List;
 
 public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
 
-    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
-            throws IOException, InterruptedException;
+    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws IOException;
 
-    void abort() throws Exception;
+    void abort(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws Exception;
+
+    void close() throws IOException;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
index 2dff2525..35fbdef3 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -22,9 +22,7 @@ import java.util.List;
 
 public interface SinkCommitter<CommitInfoT> {
 
-    List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
-
-    List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+    List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException;
 
     void abort() throws Exception;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 933bfeb0..4e22dd5b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -22,30 +22,36 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public interface SinkWriter<T, StateT> {
-    void write(T element) throws IOException, InterruptedException;
+public interface SinkWriter<T, CommitInfoT, StateT> {
+
+    void write(T element) throws IOException;
+
+    /**
+     * prepare the commit, will be called before {@link #snapshotState()}
+     *
+     * @return the commit info need to commit
+     */
+    CommitInfoT prepareCommit() throws IOException;
 
     /**
      * @return The writer's state.
      * @throws IOException if fail to snapshot writer's state.
-     * @deprecated implement {@link #snapshotState(long)}
      */
     default List<StateT> snapshotState() throws IOException {
         return Collections.emptyList();
     }
 
     /**
-     * @return The writer's state.
-     * @throws IOException if fail to snapshot writer's state.
+     * call it when SinkWriter close
+     *
+     * @throws IOException if close failed
      */
-    default List<StateT> snapshotState(long checkpointId) throws IOException {
-        return snapshotState();
-    }
+    void close() throws IOException;
 
     interface Context {
 
         /**
-         * Gets the configuration with which Flink was started.
+         * Gets the configuration with which Job was started.
          */
         Map<String, String> getConfiguration();
 
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/SerializerConverter.java
similarity index 69%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/SerializerConverter.java
index 1f38efa3..c8abb22f 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/SerializerConverter.java
@@ -15,15 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.serialization;
 
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.serialization.Serializer;
 
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public interface SerializerConverter<T> {
 
-    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
-            throws IOException, InterruptedException;
-
-    void abort() throws Exception;
+    T convert(Serializer<?> serializer);
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkAggregatedCommitterConverter.java
similarity index 69%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkAggregatedCommitterConverter.java
index 1f38efa3..39aae0e5 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkAggregatedCommitterConverter.java
@@ -15,15 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.sink;
 
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public interface SinkAggregatedCommitterConverter<T> {
 
-    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
-            throws IOException, InterruptedException;
+    T convert(SinkAggregatedCommitter<?, ?> sinkCommitter);
 
-    void abort() throws Exception;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkCommitterConverter.java
similarity index 69%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkCommitterConverter.java
index 1f38efa3..900d44d0 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkCommitterConverter.java
@@ -15,15 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.sink;
 
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.SinkCommitter;
 
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public interface SinkCommitterConverter<T> {
 
-    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
-            throws IOException, InterruptedException;
-
-    void abort() throws Exception;
+    T convert(SinkCommitter<?> sinkCommitter);
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java
similarity index 69%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java
index 1f38efa3..dcc1ea21 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java
@@ -15,15 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.sink;
 
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.Sink;
 
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+import java.util.Map;
 
-    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
-            throws IOException, InterruptedException;
+public interface SinkConverter<T> {
+
+    T convert(Sink<?, ?, ?, ?> sink, Map<String, String> configuration);
 
-    void abort() throws Exception;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkWriterConverter.java
similarity index 69%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkWriterConverter.java
index 1f38efa3..6be05f14 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkWriterConverter.java
@@ -15,15 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.sink;
 
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.SinkWriter;
 
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public interface SinkWriterConverter<T> {
 
-    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
-            throws IOException, InterruptedException;
+    T convert(SinkWriter<?, ?, ?> sinkWriter);
 
-    void abort() throws Exception;
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-translation/seatunnel-translation-flink/pom.xml
index e996acdc..114d4134 100644
--- a/seatunnel-translation/seatunnel-translation-flink/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/pom.xml
@@ -43,6 +43,13 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
similarity index 52%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
index 2dff2525..98042c59 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
@@ -15,16 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.serialization;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
-import java.util.List;
 
-public interface SinkCommitter<CommitInfoT> {
+public class FlinkSimpleVersionedSerializer<T> implements SimpleVersionedSerializer<T> {
+
+    private final Serializer<T> serializer;
+
+    public FlinkSimpleVersionedSerializer(Serializer<T> serializer) {
+        this.serializer = serializer;
+    }
 
-    List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
+    @Override
+    public int getVersion() {
+        return 0;
+    }
 
-    List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+    @Override
+    public byte[] serialize(T obj) throws IOException {
+        return serializer.serialize(obj);
+    }
 
-    void abort() throws Exception;
+    @Override
+    public T deserialize(int version, byte[] serialized) throws IOException {
+        return serializer.deserialize(serialized);
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
similarity index 58%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
index 2dff2525..296bccb8 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
@@ -15,16 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+
+import org.apache.flink.api.connector.sink.Committer;
 
 import java.io.IOException;
 import java.util.List;
 
-public interface SinkCommitter<CommitInfoT> {
+public class FlinkCommitter<CommT> implements Committer<CommT> {
+
+    private final SinkCommitter<CommT> sinkCommitter;
 
-    List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
+    FlinkCommitter(SinkCommitter<CommT> sinkCommitter) {
+        this.sinkCommitter = sinkCommitter;
+    }
 
-    List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+    @Override
+    public List<CommT> commit(List<CommT> committables) throws IOException {
+        return sinkCommitter.commit(committables);
+    }
 
-    void abort() throws Exception;
+    @Override
+    public void close() throws Exception {
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitterConverter.java
similarity index 63%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitterConverter.java
index 2dff2525..981e1376 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitterConverter.java
@@ -15,16 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.sink;
 
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.translation.sink.SinkCommitterConverter;
 
-public interface SinkCommitter<CommitInfoT> {
+import org.apache.flink.api.connector.sink.Committer;
 
-    List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
+public class FlinkCommitterConverter<CommT> implements SinkCommitterConverter<Committer<CommT>> {
 
-    List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+    @Override
+    public Committer<CommT> convert(SinkCommitter<?> sinkCommitter) {
+        return new FlinkCommitter(sinkCommitter);
+    }
 
-    void abort() throws Exception;
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
new file mode 100644
index 00000000..58c8caa2
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class FlinkGlobalCommitter<CommT, GlobalCommT> implements GlobalCommitter<CommT, GlobalCommT> {
+
+    private SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter;
+
+    FlinkGlobalCommitter(SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter) {
+        this.aggregatedCommitter = aggregatedCommitter;
+    }
+
+    @Override
+    public List<GlobalCommT> filterRecoveredCommittables(List globalCommittables) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public GlobalCommT combine(List<CommT> committables) throws IOException {
+        // TODO add combine logic
+        return null;
+    }
+
+    @Override
+    public List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException {
+        List<GlobalCommT> all = new ArrayList<>();
+        globalCommittables.forEach(c -> {
+            all.addAll((List) c);
+        });
+        globalCommittables.clear();
+        globalCommittables.addAll(all);
+        return aggregatedCommitter.commit(globalCommittables);
+    }
+
+    @Override
+    public void endOfInput() throws IOException {
+    }
+
+    @Override
+    public void close() throws Exception {
+        aggregatedCommitter.close();
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitterConverter.java
similarity index 58%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitterConverter.java
index 1f38efa3..fd51c37c 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitterConverter.java
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.sink;
 
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.translation.sink.SinkAggregatedCommitterConverter;
 
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+import org.apache.flink.api.connector.sink.GlobalCommitter;
 
-    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
-            throws IOException, InterruptedException;
-
-    void abort() throws Exception;
+public class FlinkGlobalCommitterConverter<CommT, GlobalCommT> implements SinkAggregatedCommitterConverter<GlobalCommitter<CommT, GlobalCommT>> {
+    @Override
+    public GlobalCommitter<CommT, GlobalCommT> convert(SinkAggregatedCommitter<?, ?> sinkCommitter) {
+        return new FlinkGlobalCommitter(sinkCommitter);
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSimpleVersionedSerializerConverter.java
similarity index 56%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSimpleVersionedSerializerConverter.java
index 1f38efa3..a2160165 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSimpleVersionedSerializerConverter.java
@@ -15,15 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.sink;
 
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer;
+import org.apache.seatunnel.translation.serialization.SerializerConverter;
 
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 
-    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
-            throws IOException, InterruptedException;
+public class FlinkSimpleVersionedSerializerConverter<T> implements SerializerConverter<SimpleVersionedSerializer<T>> {
 
-    void abort() throws Exception;
+    @Override
+    public SimpleVersionedSerializer<T> convert(Serializer<?> serializer) {
+        return new FlinkSimpleVersionedSerializer(serializer);
+    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
new file mode 100644
index 00000000..3d693714
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Sink.InitContext;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT, CommT, WriterStateT, GlobalCommT> {
+
+    private final org.apache.seatunnel.api.sink.Sink<InputT, WriterStateT, CommT, GlobalCommT> sink;
+    private final Map<String, String> configuration;
+
+    FlinkSink(org.apache.seatunnel.api.sink.Sink<InputT, WriterStateT, CommT, GlobalCommT> sink,
+              Map<String, String> configuration) {
+        this.sink = sink;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public SinkWriter<InputT, CommT, WriterStateT> createWriter(InitContext context, List<WriterStateT> states) throws IOException {
+        // TODO add subtask and parallelism.
+        org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
+                new DefaultSinkWriterContext(configuration, 0, 0);
+
+        FlinkSinkWriterConverter<InputT, CommT, WriterStateT> converter = new FlinkSinkWriterConverter<>();
+
+        if (states == null || states.isEmpty()) {
+            return converter.convert(sink.createWriter(stContext));
+        } else {
+            return converter.convert(sink.restoreWriter(stContext, states));
+        }
+    }
+
+    @Override
+    public Optional<Committer<CommT>> createCommitter() throws IOException {
+
+        FlinkCommitterConverter<CommT> converter = new FlinkCommitterConverter<>();
+        Optional<SinkCommitter<CommT>> committer = sink.createCommitter();
+        return committer.map(converter::convert);
+
+    }
+
+    @Override
+    public Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException {
+        FlinkGlobalCommitterConverter<CommT, GlobalCommT> converter = new FlinkGlobalCommitterConverter<>();
+        Optional<SinkAggregatedCommitter<CommT, GlobalCommT>> committer = sink.createAggregatedCommitter();
+        return committer.map(converter::convert);
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer() {
+        if (sink.getCommitInfoSerializer().isPresent()) {
+            return Optional.of(new FlinkSimpleVersionedSerializerConverter().convert(sink.getCommitInfoSerializer().get()));
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer() {
+        if (sink.getAggregatedCommitInfoSerializer().isPresent()) {
+            return Optional.of(new FlinkSimpleVersionedSerializerConverter().convert(sink.getAggregatedCommitInfoSerializer().get()));
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer() {
+        if (sink.getWriterStateSerializer().isPresent()) {
+            return Optional.of(new FlinkSimpleVersionedSerializerConverter().convert(sink.getWriterStateSerializer().get()));
+        } else {
+            return Optional.empty();
+        }
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
similarity index 63%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
index 2dff2525..02c2dda3 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
@@ -15,16 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.sink;
 
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.Sink;
+import org.apache.seatunnel.translation.sink.SinkConverter;
 
-public interface SinkCommitter<CommitInfoT> {
+import java.util.Map;
 
-    List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
+public class FlinkSinkConverter implements SinkConverter<org.apache.flink.api.connector.sink.Sink> {
 
-    List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+    @Override
+    public org.apache.flink.api.connector.sink.Sink convert(Sink<?, ?, ?, ?> sink, Map<String, String> configuration) {
 
-    void abort() throws Exception;
+        return new FlinkSink(sink, configuration);
+
+    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
new file mode 100644
index 00000000..a1dc6e8a
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.flink.serialization.FlinkRowSerialization;
+
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink.SinkWriter.Context;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.util.Collections;
+import java.util.List;
+
+public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<InputT, CommT, WriterStateT> {
+
+    private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter;
+    private final FlinkRowSerialization rowSerialization = new FlinkRowSerialization();
+
+    FlinkSinkWriter(org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter) {
+        this.sinkWriter = sinkWriter;
+    }
+
+    @Override
+    public void write(InputT element, Context context) throws IOException {
+        if (element instanceof Row) {
+            sinkWriter.write(rowSerialization.deserialize((Row) element));
+        } else {
+            throw new InvalidClassException("only support Flink Row at now, the element Class is " + element.getClass());
+        }
+    }
+
+    @Override
+    public List<CommT> prepareCommit(boolean flush) throws IOException {
+        return Collections.singletonList(sinkWriter.prepareCommit());
+    }
+
+    @Override
+    public List<WriterStateT> snapshotState() throws IOException {
+        return sinkWriter.snapshotState();
+    }
+
+    @Override
+    public void close() throws Exception {
+        sinkWriter.close();
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
similarity index 59%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
index 1f38efa3..9520b27d 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.sink;
 
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.translation.sink.SinkWriterConverter;
 
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public class FlinkSinkWriterConverter<InputT, CommT, WriterStateT> implements SinkWriterConverter<org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT>> {
 
-    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
-            throws IOException, InterruptedException;
-
-    void abort() throws Exception;
+    @Override
+    public org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT> convert(SinkWriter<?, ?, ?> sinkWriter) {
+        return new FlinkSinkWriter(sinkWriter);
+    }
 }