You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/13 03:42:56 UTC

[incubator-seatunnel] branch api-draft updated: [Api-draft] Support Spark Sink convert to SeaTunnel (#1863)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 48423a92 [Api-draft] Support Spark Sink convert to SeaTunnel (#1863)
48423a92 is described below

commit 48423a92138838fa3dec7ce6bb05f338d63ec4df
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Fri May 13 11:42:51 2022 +0800

    [Api-draft] Support Spark Sink convert to SeaTunnel (#1863)
    
    * Add Flink Sink Converter to support SeaTunnel transfer to Flink engine.
    * Support Spark sink translation
    * change license
---
 pom.xml                                            |  7 ++
 .../seatunnel/api/serialization/Serializer.java    |  3 +-
 .../api/sink/SinkAggregatedCommitter.java          |  5 +-
 .../apache/seatunnel/api/sink/SinkCommitter.java   |  5 +-
 .../org/apache/seatunnel/api/sink/SinkWriter.java  |  3 +-
 seatunnel-common/pom.xml                           |  5 ++
 .../seatunnel/common/utils/SerializationUtils.java | 37 +++++----
 seatunnel-dist/release-docs/LICENSE                |  4 -
 .../flink/sink/FlinkGlobalCommitter.java           | 10 +--
 .../spark/serialization/SparkRowSerialization.java | 10 +++
 .../spark/sink/AbstractSparkWriterConverter.java   | 40 ++++++++++
 .../spark/sink/SparkDataSourceWriter.java          | 90 ++++++++++++++++++++++
 .../spark/sink/SparkDataSourceWriterConverter.java | 43 +++++++++++
 .../translation/spark/sink/SparkDataWriter.java    | 72 +++++++++++++++++
 .../spark/sink/SparkDataWriterFactory.java         | 51 ++++++++++++
 .../translation/spark/sink/SparkSink.java          | 86 +++++++++++++++++++++
 .../translation/spark/sink/SparkSinkConverter.java | 37 +++++++++
 .../translation/spark/sink/SparkSinkInjector.java  | 41 ++++++++++
 .../translation/spark/sink/SparkStreamWriter.java  | 77 ++++++++++++++++++
 .../spark/sink/SparkStreamWriterConverter.java     | 43 +++++++++++
 .../spark/sink/SparkWriterCommitMessage.java       | 21 +++--
 tools/dependencies/known-dependencies.txt          |  4 -
 22 files changed, 646 insertions(+), 48 deletions(-)

diff --git a/pom.xml b/pom.xml
index ad0cfdc4..9a5382b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -143,6 +143,7 @@
         <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
         <spark.scope>provided</spark.scope>
         <flink.scope>provided</flink.scope>
+        <codec.version>1.13</codec.version>
         <httpclient.version>4.5.13</httpclient.version>
         <httpcore.version>4.4.4</httpcore.version>
         <httpcore-nio.version>4.4.4</httpcore-nio.version>
@@ -295,6 +296,12 @@
                 <version>${spoiwo.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>commons-codec</groupId>
+                <artifactId>commons-codec</artifactId>
+                <version>${codec.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>com.typesafe.play</groupId>
                 <artifactId>play-mailer_${scala.binary.version}</artifactId>
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
index 61703486..fe619944 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
@@ -18,8 +18,9 @@
 package org.apache.seatunnel.api.serialization;
 
 import java.io.IOException;
+import java.io.Serializable;
 
-public interface Serializer<T> {
+public interface Serializer<T> extends Serializable{
 
     /**
      * Serializes the given object.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index fcca1c5e..b9d386a2 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -18,12 +18,15 @@
 package org.apache.seatunnel.api.sink;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> extends Serializable {
 
     List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws IOException;
 
+    AggregatedCommitInfoT combine(List<CommitInfoT> commitInfos);
+
     void abort(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws Exception;
 
     void close() throws IOException;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
index 35fbdef3..f734f12a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -18,11 +18,12 @@
 package org.apache.seatunnel.api.sink;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 
-public interface SinkCommitter<CommitInfoT> {
+public interface SinkCommitter<CommitInfoT> extends Serializable {
 
     List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException;
 
-    void abort() throws Exception;
+    void abort() throws IOException;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 4e22dd5b..eeaa66b7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -18,11 +18,12 @@
 package org.apache.seatunnel.api.sink;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public interface SinkWriter<T, CommitInfoT, StateT> {
+public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable{
 
     void write(T element) throws IOException;
 
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index c6033308..99e1bb19 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -44,6 +44,11 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
similarity index 54%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
copy to seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
index 61703486..0b6e9c52 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
@@ -15,27 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.serialization;
+package org.apache.seatunnel.common.utils;
 
-import java.io.IOException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
 
-public interface Serializer<T> {
+import java.io.Serializable;
 
-    /**
-     * Serializes the given object.
-     *
-     * @param obj The object to serialize.
-     * @return The serialized data (bytes).
-     * @throws IOException Thrown, if the serialization fails.
-     */
-    byte[] serialize(T obj) throws IOException;
+public class SerializationUtils {
 
-    /**
-     * De-serializes the given data (bytes).
-     *
-     * @param serialized The serialized data
-     * @return The deserialized object
-     * @throws IOException Thrown, if the deserialization fails.
-     */
-    T deserialize(byte[] serialized) throws IOException;
+    public static String objectToString(Serializable obj) {
+        if (obj != null) {
+            return Base64.encodeBase64String(org.apache.commons.lang3.SerializationUtils.serialize(obj));
+        }
+        return null;
+    }
+
+    public static <T extends Serializable> T stringToObject(String str) {
+        if (StringUtils.isNotEmpty(str)) {
+            return org.apache.commons.lang3.SerializationUtils.deserialize(Base64.decodeBase64(str));
+        }
+        return null;
+    }
 }
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 9dfb0fca..636eb0d1 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -343,8 +343,6 @@ The text of each license is the standard Apache 2.0 license.
      (Apache License, Version 2.0) Apache Commons BeanUtils (commons-beanutils:commons-beanutils:1.9.3 - https://commons.apache.org/proper/commons-beanutils/)
      (Apache License, Version 2.0) Apache Commons CLI (commons-cli:commons-cli:1.3.1 - http://commons.apache.org/proper/commons-cli/)
      (Apache License, Version 2.0) Apache Commons CLI (commons-cli:commons-cli:1.4 - http://commons.apache.org/proper/commons-cli/)
-     (Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.10 - http://commons.apache.org/proper/commons-codec/)
-     (Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.11 - http://commons.apache.org/proper/commons-codec/)
      (Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/)
      (Apache License, Version 2.0) Apache Commons Collections (commons-collections:commons-collections:3.2.2 - http://commons.apache.org/collections/)
      (Apache License, Version 2.0) Apache Commons Collections (org.apache.commons:commons-collections4:4.2 - http://commons.apache.org/proper/commons-collections/)
@@ -616,7 +614,6 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) Apache Avro IPC (org.apache.avro:avro-ipc:1.8.2 - http://avro.apache.org)
      (The Apache Software License, Version 2.0) Apache Avro Mapred API (org.apache.avro:avro-mapred:1.8.2 - http://avro.apache.org/avro-mapred)
      (The Apache Software License, Version 2.0) Apache Commons CSV (org.apache.commons:commons-csv:1.0 - http://commons.apache.org/proper/commons-csv/)
-     (The Apache Software License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.9 - http://commons.apache.org/proper/commons-codec/)
      (The Apache Software License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.8.1 - http://commons.apache.org/proper/commons-compress/)
      (The Apache Software License, Version 2.0) Apache Commons DBCP (org.apache.commons:commons-dbcp2:2.0.1 - http://commons.apache.org/proper/commons-dbcp/)
      (The Apache Software License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.3.2 - http://commons.apache.org/proper/commons-lang/)
@@ -673,7 +670,6 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) ClassMate (com.fasterxml:classmate:1.1.0 - http://github.com/cowtowncoder/java-classmate)
      (The Apache Software License, Version 2.0) Commons BeanUtils Core (commons-beanutils:commons-beanutils-core:1.8.0 - http://commons.apache.org/beanutils/)
      (The Apache Software License, Version 2.0) Commons CLI (commons-cli:commons-cli:1.2 - http://commons.apache.org/cli/)
-     (The Apache Software License, Version 2.0) Commons Codec (commons-codec:commons-codec:1.7 - http://commons.apache.org/codec/)
      (The Apache Software License, Version 2.0) Commons Compress (org.apache.commons:commons-compress:1.4.1 - http://commons.apache.org/compress/)
      (The Apache Software License, Version 2.0) Commons Configuration (commons-configuration:commons-configuration:1.6 - http://commons.apache.org/${pom.artifactId.substring(8)}/)
      (The Apache Software License, Version 2.0) Commons Configuration (commons-configuration:commons-configuration:1.7 - http://commons.apache.org/configuration/)
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
index cd0ef171..7c39cf24 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.flink.api.connector.sink.GlobalCommitter;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
@@ -41,18 +40,11 @@ public class FlinkGlobalCommitter<CommT, GlobalCommT> implements GlobalCommitter
 
     @Override
     public GlobalCommT combine(List<CommT> committables) throws IOException {
-        // TODO add combine logic
-        return null;
+        return aggregatedCommitter.combine(committables);
     }
 
     @Override
     public List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException {
-        List<GlobalCommT> all = new ArrayList<>();
-        globalCommittables.forEach(c -> {
-            all.addAll((List) c);
-        });
-        globalCommittables.clear();
-        globalCommittables.addAll(all);
         return aggregatedCommitter.commit(globalCommittables);
     }
 
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
index 5eb635c6..5d9aa995 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.translation.serialization.RowSerialization;
 
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
 
@@ -40,4 +42,12 @@ public class SparkRowSerialization implements RowSerialization<Row> {
         }
         return new SeaTunnelRow(fields);
     }
+
+    public SeaTunnelRow deserialize(StructType schema, InternalRow engineRow) throws IOException {
+        Object[] fields = new Object[engineRow.numFields()];
+        for (int i = 0; i < engineRow.numFields(); i++) {
+            fields[i] = engineRow.get(i, schema.apply(i).dataType());
+        }
+        return new SeaTunnelRow(fields);
+    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/AbstractSparkWriterConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/AbstractSparkWriterConverter.java
new file mode 100644
index 00000000..ed50d2bf
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/AbstractSparkWriterConverter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+public abstract class AbstractSparkWriterConverter {
+
+    protected final SinkCommitter<?> sinkCommitter;
+    protected final SinkAggregatedCommitter<?, ?> sinkAggregatedCommitter;
+    protected final StructType schema;
+
+    AbstractSparkWriterConverter(@Nullable SinkCommitter<?> sinkCommitter,
+                                 @Nullable SinkAggregatedCommitter<?, ?> sinkAggregatedCommitter,
+                                 StructType schema) {
+        this.sinkCommitter = sinkCommitter;
+        this.sinkAggregatedCommitter = sinkAggregatedCommitter;
+        this.schema = schema;
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
new file mode 100644
index 00000000..dbf813b8
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SparkDataSourceWriter<CommitInfoT, StateT, AggregatedCommitInfoT> implements DataSourceWriter {
+
+    private final SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter;
+    @Nullable
+    private final SinkCommitter<CommitInfoT> sinkCommitter;
+    @Nullable
+    private final SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> sinkAggregatedCommitter;
+    private final StructType schema;
+
+    SparkDataSourceWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
+                          @Nullable SinkCommitter<CommitInfoT> sinkCommitter,
+                          @Nullable SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> sinkAggregatedCommitter,
+                          StructType schema) {
+        this.sinkWriter = sinkWriter;
+        this.sinkCommitter = sinkCommitter;
+        this.sinkAggregatedCommitter = sinkAggregatedCommitter;
+        this.schema = schema;
+    }
+
+    @Override
+    public DataWriterFactory<InternalRow> createWriterFactory() {
+        return new SparkDataWriterFactory<>(sinkWriter, sinkCommitter, schema);
+    }
+
+    @Override
+    public void commit(WriterCommitMessage[] messages) {
+        if (sinkAggregatedCommitter != null) {
+            try {
+                sinkAggregatedCommitter.commit(combineCommitMessage(messages));
+            } catch (IOException e) {
+                throw new RuntimeException("commit failed in driver", e);
+            }
+        }
+    }
+
+    @Override
+    public void abort(WriterCommitMessage[] messages) {
+        if (sinkAggregatedCommitter != null) {
+            try {
+                sinkAggregatedCommitter.abort(combineCommitMessage(messages));
+            } catch (Exception e) {
+                throw new RuntimeException("abort failed in driver", e);
+            }
+        }
+    }
+
+    private List<AggregatedCommitInfoT> combineCommitMessage(WriterCommitMessage[] messages) {
+        return Collections.singletonList(sinkAggregatedCommitter.combine(
+                Arrays.stream(messages).map(m -> ((SparkWriterCommitMessage<CommitInfoT>) m).getMessage())
+                        .collect(Collectors.toList())));
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriterConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriterConverter.java
new file mode 100644
index 00000000..83e09b58
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriterConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.translation.sink.SinkWriterConverter;
+
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+public class SparkDataSourceWriterConverter extends AbstractSparkWriterConverter
+        implements SinkWriterConverter<DataSourceWriter> {
+
+    SparkDataSourceWriterConverter(@Nullable SinkCommitter<?> sinkCommitter,
+                                   @Nullable SinkAggregatedCommitter<?, ?> sinkAggregatedCommitter,
+                                   StructType schema) {
+        super(sinkCommitter, sinkAggregatedCommitter, schema);
+    }
+
+    @Override
+    public DataSourceWriter convert(SinkWriter<?, ?, ?> sinkWriter) {
+        return new SparkDataSourceWriter(sinkWriter, sinkCommitter, sinkAggregatedCommitter, schema);
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
new file mode 100644
index 00000000..e17d99e4
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.spark.serialization.SparkRowSerialization;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class SparkDataWriter<CommitInfoT, StateT> implements DataWriter<InternalRow> {
+
+    private final SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter;
+
+    @Nullable
+    private final SinkCommitter<CommitInfoT> sinkCommitter;
+    private final StructType schema;
+    private final SparkRowSerialization rowSerialization = new SparkRowSerialization();
+
+    SparkDataWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
+                    SinkCommitter<CommitInfoT> sinkCommitter,
+                    StructType schema) {
+        this.sinkWriter = sinkWriter;
+        this.sinkCommitter = sinkCommitter;
+        this.schema = schema;
+    }
+
+    @Override
+    public void write(InternalRow record) throws IOException {
+        sinkWriter.write(rowSerialization.deserialize(schema, record));
+    }
+
+    @Override
+    public WriterCommitMessage commit() throws IOException {
+        CommitInfoT commitInfo = sinkWriter.prepareCommit();
+        if (sinkCommitter != null) {
+            sinkCommitter.commit(Collections.singletonList(commitInfo));
+        }
+        return new SparkWriterCommitMessage<>(commitInfo);
+    }
+
+    @Override
+    public void abort() throws IOException {
+        if (sinkCommitter != null) {
+            sinkCommitter.abort();
+        }
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
new file mode 100644
index 00000000..4c8e239c
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+public class SparkDataWriterFactory<CommitInfoT, StateT> implements DataWriterFactory<InternalRow> {
+
+    private final SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter;
+    @Nullable
+    private final SinkCommitter<CommitInfoT> sinkCommitter;
+    private final StructType schema;
+
+    SparkDataWriterFactory(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
+                           @Nullable SinkCommitter<CommitInfoT> sinkCommitter,
+                           StructType schema) {
+        this.sinkWriter = sinkWriter;
+        this.sinkCommitter = sinkCommitter;
+        this.schema = schema;
+    }
+
+    @Override
+    public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
+        // TODO use partitionID, taskId, epochId information.
+        return new SparkDataWriter<>(sinkWriter, sinkCommitter, schema);
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
new file mode 100644
index 00000000..5a07efb2
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.utils.SerializationUtils;
+
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.StreamWriteSupport;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+public class SparkSink<InputT, StateT, CommitInfoT, AggregatedCommitInfoT> implements WriteSupport,
+        StreamWriteSupport, DataSourceV2 {
+
+    private SeaTunnelSink<InputT, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
+    private Map<String, String> configuration;
+
+    private void init(DataSourceOptions options) {
+        if (sink == null) {
+            this.sink = SerializationUtils.stringToObject(
+                    options.get("sink").orElseThrow(() -> new IllegalArgumentException("can not find sink " +
+                            "class string in DataSourceOptions")));
+            this.configuration = SerializationUtils.stringToObject(
+                    options.get("configuration").orElseThrow(() -> new IllegalArgumentException("can not " +
+                            "find configuration class string in DataSourceOptions")));
+        }
+    }
+
+    @Override
+    public StreamWriter createStreamWriter(String queryId, StructType schema, OutputMode mode, DataSourceOptions options) {
+
+        init(options);
+        // TODO add subtask and parallelism.
+        org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
+                new DefaultSinkWriterContext(configuration, 0, 0);
+
+        try {
+            return new SparkStreamWriterConverter(sink.createCommitter().orElse(null),
+                    sink.createAggregatedCommitter().orElse(null), schema).convert(sink.createWriter(stContext));
+        } catch (IOException e) {
+            throw new RuntimeException("find error when createStreamWriter", e);
+        }
+    }
+
+    @Override
+    public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
+
+        init(options);
+        // TODO add subtask and parallelism.
+        org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
+                new DefaultSinkWriterContext(configuration, 0, 0);
+
+        try {
+            return Optional.of(new SparkDataSourceWriterConverter(sink.createCommitter().orElse(null),
+                    sink.createAggregatedCommitter().orElse(null), schema).convert(sink.createWriter(stContext)));
+        } catch (IOException e) {
+            throw new RuntimeException("find error when createStreamWriter", e);
+        }
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkConverter.java
new file mode 100644
index 00000000..d7f01474
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkConverter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.translation.sink.SinkConverter;
+
+import org.apache.spark.sql.sources.v2.WriteSupport;
+
+import java.util.Map;
+
+public class SparkSinkConverter<I, StateT, CommitInfoT, AggregatedCommitInfoT>
+        implements SinkConverter<SeaTunnelSink<I, StateT, CommitInfoT, AggregatedCommitInfoT>, WriteSupport> {
+
+    @Override
+    public WriteSupport convert(SeaTunnelSink<I, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
+                                Map<String, String> configuration) {
+        throw new UnsupportedOperationException("Do not use SinkConverter to convert SeaTunnel Sink in " +
+                "Spark engine. Because Spark use refactor and SPI to load sink class. Use SparkSinkInjector" +
+                " instead.");
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
new file mode 100644
index 00000000..8aeb4537
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.utils.SerializationUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+
+import java.util.HashMap;
+
+public class SparkSinkInjector {
+
+    private static final String SPARK_SINK_CLASS_NAME = "org.apache.seatunnel.translation.spark.sink.SparkSink";
+
+    public static DataStreamWriter<Row> inject(Dataset<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink,
+                                               HashMap<String, String> configuration) {
+        return dataset.writeStream().format(SPARK_SINK_CLASS_NAME).outputMode(OutputMode.Append())
+                .option("configuration", SerializationUtils.objectToString(configuration)).option("sink",
+                        SerializationUtils.objectToString(sink));
+    }
+
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java
new file mode 100644
index 00000000..42e3c8e1
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+public class SparkStreamWriter<CommitInfoT, StateT, AggregatedCommitInfoT> extends SparkDataSourceWriter<CommitInfoT, StateT, AggregatedCommitInfoT>
+        implements StreamWriter {
+
+    SparkStreamWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
+                      @Nullable SinkCommitter<CommitInfoT> sinkCommitter,
+                      @Nullable SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> sinkAggregatedCommitter,
+                      StructType schema) {
+        super(sinkWriter, sinkCommitter, sinkAggregatedCommitter, schema);
+    }
+
+    @Override
+    public void commit(long epochId, WriterCommitMessage[] messages) {
+        super.commit(messages);
+    }
+
+    @Override
+    public void abort(long epochId, WriterCommitMessage[] messages) {
+        super.abort(messages);
+    }
+
+    @Override
+    public void commit(WriterCommitMessage[] messages) {
+        StreamWriter.super.commit(messages);
+    }
+
+    @Override
+    public void abort(WriterCommitMessage[] messages) {
+        StreamWriter.super.abort(messages);
+    }
+
+    @Override
+    public DataWriterFactory<InternalRow> createWriterFactory() {
+        return super.createWriterFactory();
+    }
+
+    @Override
+    public boolean useCommitCoordinator() {
+        return StreamWriter.super.useCommitCoordinator();
+    }
+
+    @Override
+    public void onDataWriterCommit(WriterCommitMessage message) {
+        StreamWriter.super.onDataWriterCommit(message);
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriterConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriterConverter.java
new file mode 100644
index 00000000..7b7c4ed2
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriterConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.translation.sink.SinkWriterConverter;
+
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+public class SparkStreamWriterConverter extends AbstractSparkWriterConverter
+        implements SinkWriterConverter<StreamWriter> {
+
+    SparkStreamWriterConverter(@Nullable SinkCommitter<?> sinkCommitter,
+                               @Nullable SinkAggregatedCommitter<?, ?> sinkAggregatedCommitter,
+                               StructType schema) {
+        super(sinkCommitter, sinkAggregatedCommitter, schema);
+    }
+
+    @Override
+    public StreamWriter convert(SinkWriter<?, ?, ?> sinkWriter) {
+        return new SparkStreamWriter(sinkWriter, sinkCommitter, sinkAggregatedCommitter, schema);
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
similarity index 65%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
index fcca1c5e..b292282a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
@@ -15,16 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.spark.sink;
 
-import java.io.IOException;
-import java.util.List;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public class SparkWriterCommitMessage<T> implements WriterCommitMessage {
 
-    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws IOException;
+    private T message;
 
-    void abort(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws Exception;
+    SparkWriterCommitMessage(T message) {
+        this.message = message;
+    }
 
-    void close() throws IOException;
+    public T getMessage() {
+        return message;
+    }
+
+    public void setMessage(T message) {
+        this.message = message;
+    }
 }
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index bb2e05bb..9d9b4b7b 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -67,11 +67,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-cli-1.3.1.jar
 commons-cli-1.4.jar
-commons-codec-1.10.jar
-commons-codec-1.11.jar
 commons-codec-1.13.jar
-commons-codec-1.7.jar
-commons-codec-1.9.jar
 commons-collections-3.2.2.jar
 commons-collections4-4.2.jar
 commons-collections4-4.4.jar