You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/13 03:42:56 UTC
[incubator-seatunnel] branch api-draft updated: [Api-draft] Support Spark Sink convert to SeaTunnel (#1863)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 48423a92 [Api-draft] Support Spark Sink convert to SeaTunnel (#1863)
48423a92 is described below
commit 48423a92138838fa3dec7ce6bb05f338d63ec4df
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Fri May 13 11:42:51 2022 +0800
[Api-draft] Support Spark Sink convert to SeaTunnel (#1863)
* Add Flink Sink Converter to support SeaTunnel transfer to Flink engine.
* Support Spark sink translation
* change license
---
pom.xml | 7 ++
.../seatunnel/api/serialization/Serializer.java | 3 +-
.../api/sink/SinkAggregatedCommitter.java | 5 +-
.../apache/seatunnel/api/sink/SinkCommitter.java | 5 +-
.../org/apache/seatunnel/api/sink/SinkWriter.java | 3 +-
seatunnel-common/pom.xml | 5 ++
.../seatunnel/common/utils/SerializationUtils.java | 37 +++++----
seatunnel-dist/release-docs/LICENSE | 4 -
.../flink/sink/FlinkGlobalCommitter.java | 10 +--
.../spark/serialization/SparkRowSerialization.java | 10 +++
.../spark/sink/AbstractSparkWriterConverter.java | 40 ++++++++++
.../spark/sink/SparkDataSourceWriter.java | 90 ++++++++++++++++++++++
.../spark/sink/SparkDataSourceWriterConverter.java | 43 +++++++++++
.../translation/spark/sink/SparkDataWriter.java | 72 +++++++++++++++++
.../spark/sink/SparkDataWriterFactory.java | 51 ++++++++++++
.../translation/spark/sink/SparkSink.java | 86 +++++++++++++++++++++
.../translation/spark/sink/SparkSinkConverter.java | 37 +++++++++
.../translation/spark/sink/SparkSinkInjector.java | 41 ++++++++++
.../translation/spark/sink/SparkStreamWriter.java | 77 ++++++++++++++++++
.../spark/sink/SparkStreamWriterConverter.java | 43 +++++++++++
.../spark/sink/SparkWriterCommitMessage.java | 21 +++--
tools/dependencies/known-dependencies.txt | 4 -
22 files changed, 646 insertions(+), 48 deletions(-)
diff --git a/pom.xml b/pom.xml
index ad0cfdc4..9a5382b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -143,6 +143,7 @@
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
<spark.scope>provided</spark.scope>
<flink.scope>provided</flink.scope>
+ <codec.version>1.13</codec.version>
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.4</httpcore.version>
<httpcore-nio.version>4.4.4</httpcore-nio.version>
@@ -295,6 +296,12 @@
<version>${spoiwo.version}</version>
</dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>${codec.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.typesafe.play</groupId>
<artifactId>play-mailer_${scala.binary.version}</artifactId>
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
index 61703486..fe619944 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
@@ -18,8 +18,9 @@
package org.apache.seatunnel.api.serialization;
import java.io.IOException;
+import java.io.Serializable;
-public interface Serializer<T> {
+public interface Serializer<T> extends Serializable{
/**
* Serializes the given object.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index fcca1c5e..b9d386a2 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -18,12 +18,15 @@
package org.apache.seatunnel.api.sink;
import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> extends Serializable {
List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws IOException;
+ AggregatedCommitInfoT combine(List<CommitInfoT> commitInfos);
+
void abort(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws Exception;
void close() throws IOException;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
index 35fbdef3..f734f12a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -18,11 +18,12 @@
package org.apache.seatunnel.api.sink;
import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
-public interface SinkCommitter<CommitInfoT> {
+public interface SinkCommitter<CommitInfoT> extends Serializable {
List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException;
- void abort() throws Exception;
+ void abort() throws IOException;
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 4e22dd5b..eeaa66b7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -18,11 +18,12 @@
package org.apache.seatunnel.api.sink;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-public interface SinkWriter<T, CommitInfoT, StateT> {
+public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable{
void write(T element) throws IOException;
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index c6033308..99e1bb19 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -44,6 +44,11 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
similarity index 54%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
copy to seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
index 61703486..0b6e9c52 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
@@ -15,27 +15,26 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.serialization;
+package org.apache.seatunnel.common.utils;
-import java.io.IOException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
-public interface Serializer<T> {
+import java.io.Serializable;
- /**
- * Serializes the given object.
- *
- * @param obj The object to serialize.
- * @return The serialized data (bytes).
- * @throws IOException Thrown, if the serialization fails.
- */
- byte[] serialize(T obj) throws IOException;
+public class SerializationUtils {
- /**
- * De-serializes the given data (bytes).
- *
- * @param serialized The serialized data
- * @return The deserialized object
- * @throws IOException Thrown, if the deserialization fails.
- */
- T deserialize(byte[] serialized) throws IOException;
+ public static String objectToString(Serializable obj) {
+ if (obj != null) {
+ return Base64.encodeBase64String(org.apache.commons.lang3.SerializationUtils.serialize(obj));
+ }
+ return null;
+ }
+
+ public static <T extends Serializable> T stringToObject(String str) {
+ if (StringUtils.isNotEmpty(str)) {
+ return org.apache.commons.lang3.SerializationUtils.deserialize(Base64.decodeBase64(str));
+ }
+ return null;
+ }
}
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 9dfb0fca..636eb0d1 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -343,8 +343,6 @@ The text of each license is the standard Apache 2.0 license.
(Apache License, Version 2.0) Apache Commons BeanUtils (commons-beanutils:commons-beanutils:1.9.3 - https://commons.apache.org/proper/commons-beanutils/)
(Apache License, Version 2.0) Apache Commons CLI (commons-cli:commons-cli:1.3.1 - http://commons.apache.org/proper/commons-cli/)
(Apache License, Version 2.0) Apache Commons CLI (commons-cli:commons-cli:1.4 - http://commons.apache.org/proper/commons-cli/)
- (Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.10 - http://commons.apache.org/proper/commons-codec/)
- (Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.11 - http://commons.apache.org/proper/commons-codec/)
(Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/)
(Apache License, Version 2.0) Apache Commons Collections (commons-collections:commons-collections:3.2.2 - http://commons.apache.org/collections/)
(Apache License, Version 2.0) Apache Commons Collections (org.apache.commons:commons-collections4:4.2 - http://commons.apache.org/proper/commons-collections/)
@@ -616,7 +614,6 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) Apache Avro IPC (org.apache.avro:avro-ipc:1.8.2 - http://avro.apache.org)
(The Apache Software License, Version 2.0) Apache Avro Mapred API (org.apache.avro:avro-mapred:1.8.2 - http://avro.apache.org/avro-mapred)
(The Apache Software License, Version 2.0) Apache Commons CSV (org.apache.commons:commons-csv:1.0 - http://commons.apache.org/proper/commons-csv/)
- (The Apache Software License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.9 - http://commons.apache.org/proper/commons-codec/)
(The Apache Software License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.8.1 - http://commons.apache.org/proper/commons-compress/)
(The Apache Software License, Version 2.0) Apache Commons DBCP (org.apache.commons:commons-dbcp2:2.0.1 - http://commons.apache.org/proper/commons-dbcp/)
(The Apache Software License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.3.2 - http://commons.apache.org/proper/commons-lang/)
@@ -673,7 +670,6 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) ClassMate (com.fasterxml:classmate:1.1.0 - http://github.com/cowtowncoder/java-classmate)
(The Apache Software License, Version 2.0) Commons BeanUtils Core (commons-beanutils:commons-beanutils-core:1.8.0 - http://commons.apache.org/beanutils/)
(The Apache Software License, Version 2.0) Commons CLI (commons-cli:commons-cli:1.2 - http://commons.apache.org/cli/)
- (The Apache Software License, Version 2.0) Commons Codec (commons-codec:commons-codec:1.7 - http://commons.apache.org/codec/)
(The Apache Software License, Version 2.0) Commons Compress (org.apache.commons:commons-compress:1.4.1 - http://commons.apache.org/compress/)
(The Apache Software License, Version 2.0) Commons Configuration (commons-configuration:commons-configuration:1.6 - http://commons.apache.org/${pom.artifactId.substring(8)}/)
(The Apache Software License, Version 2.0) Commons Configuration (commons-configuration:commons-configuration:1.7 - http://commons.apache.org/configuration/)
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
index cd0ef171..7c39cf24 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -41,18 +40,11 @@ public class FlinkGlobalCommitter<CommT, GlobalCommT> implements GlobalCommitter
@Override
public GlobalCommT combine(List<CommT> committables) throws IOException {
- // TODO add combine logic
- return null;
+ return aggregatedCommitter.combine(committables);
}
@Override
public List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException {
- List<GlobalCommT> all = new ArrayList<>();
- globalCommittables.forEach(c -> {
- all.addAll((List) c);
- });
- globalCommittables.clear();
- globalCommittables.addAll(all);
return aggregatedCommitter.commit(globalCommittables);
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
index 5eb635c6..5d9aa995 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/SparkRowSerialization.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.translation.serialization.RowSerialization;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
import java.io.IOException;
@@ -40,4 +42,12 @@ public class SparkRowSerialization implements RowSerialization<Row> {
}
return new SeaTunnelRow(fields);
}
+
+ public SeaTunnelRow deserialize(StructType schema, InternalRow engineRow) throws IOException {
+ Object[] fields = new Object[engineRow.numFields()];
+ for (int i = 0; i < engineRow.numFields(); i++) {
+ fields[i] = engineRow.get(i, schema.apply(i).dataType());
+ }
+ return new SeaTunnelRow(fields);
+ }
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/AbstractSparkWriterConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/AbstractSparkWriterConverter.java
new file mode 100644
index 00000000..ed50d2bf
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/AbstractSparkWriterConverter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+public abstract class AbstractSparkWriterConverter {
+
+ protected final SinkCommitter<?> sinkCommitter;
+ protected final SinkAggregatedCommitter<?, ?> sinkAggregatedCommitter;
+ protected final StructType schema;
+
+ AbstractSparkWriterConverter(@Nullable SinkCommitter<?> sinkCommitter,
+ @Nullable SinkAggregatedCommitter<?, ?> sinkAggregatedCommitter,
+ StructType schema) {
+ this.sinkCommitter = sinkCommitter;
+ this.sinkAggregatedCommitter = sinkAggregatedCommitter;
+ this.schema = schema;
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
new file mode 100644
index 00000000..dbf813b8
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SparkDataSourceWriter<CommitInfoT, StateT, AggregatedCommitInfoT> implements DataSourceWriter {
+
+ private final SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter;
+ @Nullable
+ private final SinkCommitter<CommitInfoT> sinkCommitter;
+ @Nullable
+ private final SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> sinkAggregatedCommitter;
+ private final StructType schema;
+
+ SparkDataSourceWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
+ @Nullable SinkCommitter<CommitInfoT> sinkCommitter,
+ @Nullable SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> sinkAggregatedCommitter,
+ StructType schema) {
+ this.sinkWriter = sinkWriter;
+ this.sinkCommitter = sinkCommitter;
+ this.sinkAggregatedCommitter = sinkAggregatedCommitter;
+ this.schema = schema;
+ }
+
+ @Override
+ public DataWriterFactory<InternalRow> createWriterFactory() {
+ return new SparkDataWriterFactory<>(sinkWriter, sinkCommitter, schema);
+ }
+
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ if (sinkAggregatedCommitter != null) {
+ try {
+ sinkAggregatedCommitter.commit(combineCommitMessage(messages));
+ } catch (IOException e) {
+ throw new RuntimeException("commit failed in driver", e);
+ }
+ }
+ }
+
+ @Override
+ public void abort(WriterCommitMessage[] messages) {
+ if (sinkAggregatedCommitter != null) {
+ try {
+ sinkAggregatedCommitter.abort(combineCommitMessage(messages));
+ } catch (Exception e) {
+ throw new RuntimeException("abort failed in driver", e);
+ }
+ }
+ }
+
+ private List<AggregatedCommitInfoT> combineCommitMessage(WriterCommitMessage[] messages) {
+ return Collections.singletonList(sinkAggregatedCommitter.combine(
+ Arrays.stream(messages).map(m -> ((SparkWriterCommitMessage<CommitInfoT>) m).getMessage())
+ .collect(Collectors.toList())));
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriterConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriterConverter.java
new file mode 100644
index 00000000..83e09b58
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataSourceWriterConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.translation.sink.SinkWriterConverter;
+
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+public class SparkDataSourceWriterConverter extends AbstractSparkWriterConverter
+ implements SinkWriterConverter<DataSourceWriter> {
+
+ SparkDataSourceWriterConverter(@Nullable SinkCommitter<?> sinkCommitter,
+ @Nullable SinkAggregatedCommitter<?, ?> sinkAggregatedCommitter,
+ StructType schema) {
+ super(sinkCommitter, sinkAggregatedCommitter, schema);
+ }
+
+ @Override
+ public DataSourceWriter convert(SinkWriter<?, ?, ?> sinkWriter) {
+ return new SparkDataSourceWriter(sinkWriter, sinkCommitter, sinkAggregatedCommitter, schema);
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
new file mode 100644
index 00000000..e17d99e4
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.spark.serialization.SparkRowSerialization;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class SparkDataWriter<CommitInfoT, StateT> implements DataWriter<InternalRow> {
+
+ private final SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter;
+
+ @Nullable
+ private final SinkCommitter<CommitInfoT> sinkCommitter;
+ private final StructType schema;
+ private final SparkRowSerialization rowSerialization = new SparkRowSerialization();
+
+ SparkDataWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
+ SinkCommitter<CommitInfoT> sinkCommitter,
+ StructType schema) {
+ this.sinkWriter = sinkWriter;
+ this.sinkCommitter = sinkCommitter;
+ this.schema = schema;
+ }
+
+ @Override
+ public void write(InternalRow record) throws IOException {
+ sinkWriter.write(rowSerialization.deserialize(schema, record));
+ }
+
+ @Override
+ public WriterCommitMessage commit() throws IOException {
+ CommitInfoT commitInfo = sinkWriter.prepareCommit();
+ if (sinkCommitter != null) {
+ sinkCommitter.commit(Collections.singletonList(commitInfo));
+ }
+ return new SparkWriterCommitMessage<>(commitInfo);
+ }
+
+ @Override
+ public void abort() throws IOException {
+ if (sinkCommitter != null) {
+ sinkCommitter.abort();
+ }
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
new file mode 100644
index 00000000..4c8e239c
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+public class SparkDataWriterFactory<CommitInfoT, StateT> implements DataWriterFactory<InternalRow> {
+
+ private final SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter;
+ @Nullable
+ private final SinkCommitter<CommitInfoT> sinkCommitter;
+ private final StructType schema;
+
+ SparkDataWriterFactory(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
+ @Nullable SinkCommitter<CommitInfoT> sinkCommitter,
+ StructType schema) {
+ this.sinkWriter = sinkWriter;
+ this.sinkCommitter = sinkCommitter;
+ this.schema = schema;
+ }
+
+ @Override
+ public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
+ // TODO use partitionID, taskId, epochId information.
+ return new SparkDataWriter<>(sinkWriter, sinkCommitter, schema);
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
new file mode 100644
index 00000000..5a07efb2
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.utils.SerializationUtils;
+
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.StreamWriteSupport;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+public class SparkSink<InputT, StateT, CommitInfoT, AggregatedCommitInfoT> implements WriteSupport,
+ StreamWriteSupport, DataSourceV2 {
+
+ private SeaTunnelSink<InputT, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
+ private Map<String, String> configuration;
+
+ private void init(DataSourceOptions options) {
+ if (sink == null) {
+ this.sink = SerializationUtils.stringToObject(
+ options.get("sink").orElseThrow(() -> new IllegalArgumentException("can not find sink " +
+ "class string in DataSourceOptions")));
+ this.configuration = SerializationUtils.stringToObject(
+ options.get("configuration").orElseThrow(() -> new IllegalArgumentException("can not " +
+ "find configuration class string in DataSourceOptions")));
+ }
+ }
+
+ @Override
+ public StreamWriter createStreamWriter(String queryId, StructType schema, OutputMode mode, DataSourceOptions options) {
+
+ init(options);
+ // TODO add subtask and parallelism.
+ org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
+ new DefaultSinkWriterContext(configuration, 0, 0);
+
+ try {
+ return new SparkStreamWriterConverter(sink.createCommitter().orElse(null),
+ sink.createAggregatedCommitter().orElse(null), schema).convert(sink.createWriter(stContext));
+ } catch (IOException e) {
+ throw new RuntimeException("find error when createStreamWriter", e);
+ }
+ }
+
+ @Override
+ public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
+
+ init(options);
+ // TODO add subtask and parallelism.
+ org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
+ new DefaultSinkWriterContext(configuration, 0, 0);
+
+ try {
+ return Optional.of(new SparkDataSourceWriterConverter(sink.createCommitter().orElse(null),
+ sink.createAggregatedCommitter().orElse(null), schema).convert(sink.createWriter(stContext)));
+ } catch (IOException e) {
+ throw new RuntimeException("find error when createStreamWriter", e);
+ }
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkConverter.java
new file mode 100644
index 00000000..d7f01474
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkConverter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.translation.sink.SinkConverter;
+
+import org.apache.spark.sql.sources.v2.WriteSupport;
+
+import java.util.Map;
+
+public class SparkSinkConverter<I, StateT, CommitInfoT, AggregatedCommitInfoT>
+ implements SinkConverter<SeaTunnelSink<I, StateT, CommitInfoT, AggregatedCommitInfoT>, WriteSupport> {
+
+ @Override
+ public WriteSupport convert(SeaTunnelSink<I, StateT, CommitInfoT, AggregatedCommitInfoT> sink,
+ Map<String, String> configuration) {
+ throw new UnsupportedOperationException("Do not use SinkConverter to convert SeaTunnel Sink in " +
+ "Spark engine. Because Spark use refactor and SPI to load sink class. Use SparkSinkInjector" +
+ " instead.");
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
new file mode 100644
index 00000000..8aeb4537
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.utils.SerializationUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+
+import java.util.HashMap;
+
+public class SparkSinkInjector {
+
+ private static final String SPARK_SINK_CLASS_NAME = "org.apache.seatunnel.translation.spark.sink.SparkSink";
+
+ public static DataStreamWriter<Row> inject(Dataset<Row> dataset, SeaTunnelSink<?, ?, ?, ?> sink,
+ HashMap<String, String> configuration) {
+ return dataset.writeStream().format(SPARK_SINK_CLASS_NAME).outputMode(OutputMode.Append())
+ .option("configuration", SerializationUtils.objectToString(configuration)).option("sink",
+ SerializationUtils.objectToString(sink));
+ }
+
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java
new file mode 100644
index 00000000..42e3c8e1
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+public class SparkStreamWriter<CommitInfoT, StateT, AggregatedCommitInfoT> extends SparkDataSourceWriter<CommitInfoT, StateT, AggregatedCommitInfoT>
+ implements StreamWriter {
+
+ SparkStreamWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
+ @Nullable SinkCommitter<CommitInfoT> sinkCommitter,
+ @Nullable SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> sinkAggregatedCommitter,
+ StructType schema) {
+ super(sinkWriter, sinkCommitter, sinkAggregatedCommitter, schema);
+ }
+
+ @Override
+ public void commit(long epochId, WriterCommitMessage[] messages) {
+ super.commit(messages);
+ }
+
+ @Override
+ public void abort(long epochId, WriterCommitMessage[] messages) {
+ super.abort(messages);
+ }
+
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ StreamWriter.super.commit(messages);
+ }
+
+ @Override
+ public void abort(WriterCommitMessage[] messages) {
+ StreamWriter.super.abort(messages);
+ }
+
+ @Override
+ public DataWriterFactory<InternalRow> createWriterFactory() {
+ return super.createWriterFactory();
+ }
+
+ @Override
+ public boolean useCommitCoordinator() {
+ return StreamWriter.super.useCommitCoordinator();
+ }
+
+ @Override
+ public void onDataWriterCommit(WriterCommitMessage message) {
+ StreamWriter.super.onDataWriterCommit(message);
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriterConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriterConverter.java
new file mode 100644
index 00000000..7b7c4ed2
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkStreamWriterConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.translation.sink.SinkWriterConverter;
+
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.types.StructType;
+
+import javax.annotation.Nullable;
+
+public class SparkStreamWriterConverter extends AbstractSparkWriterConverter
+ implements SinkWriterConverter<StreamWriter> {
+
+ SparkStreamWriterConverter(@Nullable SinkCommitter<?> sinkCommitter,
+ @Nullable SinkAggregatedCommitter<?, ?> sinkAggregatedCommitter,
+ StructType schema) {
+ super(sinkCommitter, sinkAggregatedCommitter, schema);
+ }
+
+ @Override
+ public StreamWriter convert(SinkWriter<?, ?, ?> sinkWriter) {
+ return new SparkStreamWriter(sinkWriter, sinkCommitter, sinkAggregatedCommitter, schema);
+ }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
similarity index 65%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
index fcca1c5e..b292282a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkWriterCommitMessage.java
@@ -15,16 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.spark.sink;
-import java.io.IOException;
-import java.util.List;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public class SparkWriterCommitMessage<T> implements WriterCommitMessage {
- List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws IOException;
+ private T message;
- void abort(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws Exception;
+ SparkWriterCommitMessage(T message) {
+ this.message = message;
+ }
- void close() throws IOException;
+ public T getMessage() {
+ return message;
+ }
+
+ public void setMessage(T message) {
+ this.message = message;
+ }
}
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index bb2e05bb..9d9b4b7b 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -67,11 +67,7 @@ commons-beanutils-core-1.8.0.jar
commons-cli-1.2.jar
commons-cli-1.3.1.jar
commons-cli-1.4.jar
-commons-codec-1.10.jar
-commons-codec-1.11.jar
commons-codec-1.13.jar
-commons-codec-1.7.jar
-commons-codec-1.9.jar
commons-collections-3.2.2.jar
commons-collections4-4.2.jar
commons-collections4-4.4.jar