You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/10 05:26:03 UTC
[incubator-seatunnel] branch api-draft updated: [Api-Draft] Add Flink Sink Converter to support SeaTunnel transfer to Flink engine. (#1839)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 0db318bc [Api-Draft] Add Flink Sink Converter to support SeaTunnel transfer to Flink engine. (#1839)
0db318bc is described below
commit 0db318bc93858d3a7e74f6633e425d8593df91f2
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Tue May 10 13:25:57 2022 +0800
[Api-Draft] Add Flink Sink Converter to support SeaTunnel transfer to Flink engine. (#1839)
* Add Flink Sink Converter to support SeaTunnel transfer to Flink engine.
---
.../DefaultSerializer.java} | 17 ++--
...ommitter.java => DefaultSinkWriterContext.java} | 30 ++++--
.../java/org/apache/seatunnel/api/sink/Sink.java | 5 +-
.../api/sink/SinkAggregatedCommitter.java | 7 +-
.../apache/seatunnel/api/sink/SinkCommitter.java | 4 +-
.../org/apache/seatunnel/api/sink/SinkWriter.java | 24 +++--
.../serialization/SerializerConverter.java | 12 +--
.../sink/SinkAggregatedCommitterConverter.java | 11 +--
.../translation/sink/SinkCommitterConverter.java | 12 +--
.../seatunnel/translation/sink/SinkConverter.java | 13 ++-
.../translation/sink/SinkWriterConverter.java | 11 +--
.../seatunnel-translation-flink/pom.xml | 7 ++
.../FlinkSimpleVersionedSerializer.java | 30 ++++--
.../translation/flink/sink/FlinkCommitter.java | 23 ++++-
.../flink/sink/FlinkCommitterConverter.java | 16 ++--
.../flink/sink/FlinkGlobalCommitter.java | 67 +++++++++++++
.../flink/sink/FlinkGlobalCommitterConverter.java | 17 ++--
.../FlinkSimpleVersionedSerializerConverter.java | 17 ++--
.../translation/flink/sink/FlinkSink.java | 104 +++++++++++++++++++++
.../translation/flink/sink/FlinkSinkConverter.java | 17 ++--
.../translation/flink/sink/FlinkSinkWriter.java | 64 +++++++++++++
.../flink/sink/FlinkSinkWriterConverter.java | 16 ++--
22 files changed, 410 insertions(+), 114 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
similarity index 66%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
index 1f38efa3..ec213c69 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
@@ -15,15 +15,20 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.api.serialization;
import java.io.IOException;
-import java.util.List;
+import java.nio.charset.StandardCharsets;
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public class DefaultSerializer implements Serializer<String> {
- List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
- throws IOException, InterruptedException;
+ @Override
+ public byte[] serialize(String obj) throws IOException {
+ return obj.getBytes(StandardCharsets.UTF_8);
+ }
- void abort() throws Exception;
+ @Override
+ public String deserialize(byte[] serialized) throws IOException {
+ return new String(serialized);
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
similarity index 54%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
index 2dff2525..aeba2dd1 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
@@ -17,14 +17,32 @@
package org.apache.seatunnel.api.sink;
-import java.io.IOException;
-import java.util.List;
+import java.util.Map;
-public interface SinkCommitter<CommitInfoT> {
+public class DefaultSinkWriterContext implements SinkWriter.Context {
- List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
+ private final Map<String, String> configuration;
+ private final int subtask;
+ private final int parallelism;
- List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+ public DefaultSinkWriterContext(Map<String, String> configuration, int subtask, int parallelism) {
+ this.configuration = configuration;
+ this.subtask = subtask;
+ this.parallelism = parallelism;
+ }
- void abort() throws Exception;
+ @Override
+ public Map<String, String> getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public int getIndexOfSubtask() {
+ return subtask;
+ }
+
+ @Override
+ public int getNumberOfParallelSubtasks() {
+ return parallelism;
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
index 2c1fedef..cd7ba6ee 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
@@ -26,9 +26,10 @@ import java.util.Optional;
public interface Sink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable {
- SinkWriter<IN, StateT> createWriter(SinkWriter.Context context) throws IOException;
+ SinkWriter<IN, CommitInfoT, StateT> createWriter(SinkWriter.Context context) throws IOException;
- default SinkWriter<IN, StateT> restoreWriter(SinkWriter.Context context, List<StateT> states) throws IOException {
+ default SinkWriter<IN, CommitInfoT, StateT> restoreWriter(SinkWriter.Context context,
+ List<StateT> states) throws IOException {
return createWriter(context);
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index 1f38efa3..fcca1c5e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -22,8 +22,9 @@ import java.util.List;
public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
- List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
- throws IOException, InterruptedException;
+ List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws IOException;
- void abort() throws Exception;
+ void abort(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws Exception;
+
+ void close() throws IOException;
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
index 2dff2525..35fbdef3 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -22,9 +22,7 @@ import java.util.List;
public interface SinkCommitter<CommitInfoT> {
- List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
-
- List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+ List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException;
void abort() throws Exception;
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 933bfeb0..4e22dd5b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -22,30 +22,36 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-public interface SinkWriter<T, StateT> {
- void write(T element) throws IOException, InterruptedException;
+public interface SinkWriter<T, CommitInfoT, StateT> {
+
+ void write(T element) throws IOException;
+
+ /**
+ * prepare the commit, will be called before {@link #snapshotState()}
+ *
+ * @return the commit info need to commit
+ */
+ CommitInfoT prepareCommit() throws IOException;
/**
* @return The writer's state.
* @throws IOException if fail to snapshot writer's state.
- * @deprecated implement {@link #snapshotState(long)}
*/
default List<StateT> snapshotState() throws IOException {
return Collections.emptyList();
}
/**
- * @return The writer's state.
- * @throws IOException if fail to snapshot writer's state.
+ * call it when SinkWriter close
+ *
+ * @throws IOException if close failed
*/
- default List<StateT> snapshotState(long checkpointId) throws IOException {
- return snapshotState();
- }
+ void close() throws IOException;
interface Context {
/**
- * Gets the configuration with which Flink was started.
+ * Gets the configuration with which Job was started.
*/
Map<String, String> getConfiguration();
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/SerializerConverter.java
similarity index 69%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/SerializerConverter.java
index 1f38efa3..c8abb22f 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/SerializerConverter.java
@@ -15,15 +15,11 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.serialization;
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.serialization.Serializer;
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public interface SerializerConverter<T> {
- List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
- throws IOException, InterruptedException;
-
- void abort() throws Exception;
+ T convert(Serializer<?> serializer);
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkAggregatedCommitterConverter.java
similarity index 69%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkAggregatedCommitterConverter.java
index 1f38efa3..39aae0e5 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkAggregatedCommitterConverter.java
@@ -15,15 +15,12 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.sink;
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public interface SinkAggregatedCommitterConverter<T> {
- List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
- throws IOException, InterruptedException;
+ T convert(SinkAggregatedCommitter<?, ?> sinkCommitter);
- void abort() throws Exception;
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkCommitterConverter.java
similarity index 69%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkCommitterConverter.java
index 1f38efa3..900d44d0 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkCommitterConverter.java
@@ -15,15 +15,11 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.sink;
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.SinkCommitter;
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public interface SinkCommitterConverter<T> {
- List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
- throws IOException, InterruptedException;
-
- void abort() throws Exception;
+ T convert(SinkCommitter<?> sinkCommitter);
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java
similarity index 69%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java
index 1f38efa3..dcc1ea21 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkConverter.java
@@ -15,15 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.sink;
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.Sink;
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+import java.util.Map;
- List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
- throws IOException, InterruptedException;
+public interface SinkConverter<T> {
+
+ T convert(Sink<?, ?, ?, ?> sink, Map<String, String> configuration);
- void abort() throws Exception;
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkWriterConverter.java
similarity index 69%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkWriterConverter.java
index 1f38efa3..6be05f14 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/sink/SinkWriterConverter.java
@@ -15,15 +15,12 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.sink;
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.SinkWriter;
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public interface SinkWriterConverter<T> {
- List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
- throws IOException, InterruptedException;
+ T convert(SinkWriter<?, ?, ?> sinkWriter);
- void abort() throws Exception;
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-translation/seatunnel-translation-flink/pom.xml
index e996acdc..114d4134 100644
--- a/seatunnel-translation/seatunnel-translation-flink/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/pom.xml
@@ -43,6 +43,13 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
similarity index 52%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
index 2dff2525..98042c59 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
@@ -15,16 +15,34 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.serialization;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
-import java.util.List;
-public interface SinkCommitter<CommitInfoT> {
+public class FlinkSimpleVersionedSerializer<T> implements SimpleVersionedSerializer<T> {
+
+ private final Serializer<T> serializer;
+
+ public FlinkSimpleVersionedSerializer(Serializer<T> serializer) {
+ this.serializer = serializer;
+ }
- List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
+ @Override
+ public int getVersion() {
+ return 0;
+ }
- List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+ @Override
+ public byte[] serialize(T obj) throws IOException {
+ return serializer.serialize(obj);
+ }
- void abort() throws Exception;
+ @Override
+ public T deserialize(int version, byte[] serialized) throws IOException {
+ return serializer.deserialize(serialized);
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
similarity index 58%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
index 2dff2525..296bccb8 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
@@ -15,16 +15,29 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+
+import org.apache.flink.api.connector.sink.Committer;
import java.io.IOException;
import java.util.List;
-public interface SinkCommitter<CommitInfoT> {
+public class FlinkCommitter<CommT> implements Committer<CommT> {
+
+ private final SinkCommitter<CommT> sinkCommitter;
- List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
+ FlinkCommitter(SinkCommitter<CommT> sinkCommitter) {
+ this.sinkCommitter = sinkCommitter;
+ }
- List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+ @Override
+ public List<CommT> commit(List<CommT> committables) throws IOException {
+ return sinkCommitter.commit(committables);
+ }
- void abort() throws Exception;
+ @Override
+ public void close() throws Exception {
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitterConverter.java
similarity index 63%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitterConverter.java
index 2dff2525..981e1376 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitterConverter.java
@@ -15,16 +15,18 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.sink;
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.translation.sink.SinkCommitterConverter;
-public interface SinkCommitter<CommitInfoT> {
+import org.apache.flink.api.connector.sink.Committer;
- List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
+public class FlinkCommitterConverter<CommT> implements SinkCommitterConverter<Committer<CommT>> {
- List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+ @Override
+ public Committer<CommT> convert(SinkCommitter<?> sinkCommitter) {
+ return new FlinkCommitter(sinkCommitter);
+ }
- void abort() throws Exception;
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
new file mode 100644
index 00000000..58c8caa2
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class FlinkGlobalCommitter<CommT, GlobalCommT> implements GlobalCommitter<CommT, GlobalCommT> {
+
+ private SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter;
+
+ FlinkGlobalCommitter(SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter) {
+ this.aggregatedCommitter = aggregatedCommitter;
+ }
+
+ @Override
+ public List<GlobalCommT> filterRecoveredCommittables(List globalCommittables) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public GlobalCommT combine(List<CommT> committables) throws IOException {
+ // TODO add combine logic
+ return null;
+ }
+
+ @Override
+ public List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException {
+ List<GlobalCommT> all = new ArrayList<>();
+ globalCommittables.forEach(c -> {
+ all.addAll((List) c);
+ });
+ globalCommittables.clear();
+ globalCommittables.addAll(all);
+ return aggregatedCommitter.commit(globalCommittables);
+ }
+
+ @Override
+ public void endOfInput() throws IOException {
+ }
+
+ @Override
+ public void close() throws Exception {
+ aggregatedCommitter.close();
+ }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitterConverter.java
similarity index 58%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitterConverter.java
index 1f38efa3..fd51c37c 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitterConverter.java
@@ -15,15 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.sink;
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.translation.sink.SinkAggregatedCommitterConverter;
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+import org.apache.flink.api.connector.sink.GlobalCommitter;
- List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
- throws IOException, InterruptedException;
-
- void abort() throws Exception;
+public class FlinkGlobalCommitterConverter<CommT, GlobalCommT> implements SinkAggregatedCommitterConverter<GlobalCommitter<CommT, GlobalCommT>> {
+ @Override
+ public GlobalCommitter<CommT, GlobalCommT> convert(SinkAggregatedCommitter<?, ?> sinkCommitter) {
+ return new FlinkGlobalCommitter(sinkCommitter);
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSimpleVersionedSerializerConverter.java
similarity index 56%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSimpleVersionedSerializerConverter.java
index 1f38efa3..a2160165 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSimpleVersionedSerializerConverter.java
@@ -15,15 +15,18 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.sink;
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer;
+import org.apache.seatunnel.translation.serialization.SerializerConverter;
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+import org.apache.flink.core.io.SimpleVersionedSerializer;
- List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
- throws IOException, InterruptedException;
+public class FlinkSimpleVersionedSerializerConverter<T> implements SerializerConverter<SimpleVersionedSerializer<T>> {
- void abort() throws Exception;
+ @Override
+ public SimpleVersionedSerializer<T> convert(Serializer<?> serializer) {
+ return new FlinkSimpleVersionedSerializer(serializer);
+ }
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
new file mode 100644
index 00000000..3d693714
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Sink.InitContext;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT, CommT, WriterStateT, GlobalCommT> {
+
+ private final org.apache.seatunnel.api.sink.Sink<InputT, WriterStateT, CommT, GlobalCommT> sink;
+ private final Map<String, String> configuration;
+
+ FlinkSink(org.apache.seatunnel.api.sink.Sink<InputT, WriterStateT, CommT, GlobalCommT> sink,
+ Map<String, String> configuration) {
+ this.sink = sink;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public SinkWriter<InputT, CommT, WriterStateT> createWriter(InitContext context, List<WriterStateT> states) throws IOException {
+ // TODO add subtask and parallelism.
+ org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
+ new DefaultSinkWriterContext(configuration, 0, 0);
+
+ FlinkSinkWriterConverter<InputT, CommT, WriterStateT> converter = new FlinkSinkWriterConverter<>();
+
+ if (states == null || states.isEmpty()) {
+ return converter.convert(sink.createWriter(stContext));
+ } else {
+ return converter.convert(sink.restoreWriter(stContext, states));
+ }
+ }
+
+ @Override
+ public Optional<Committer<CommT>> createCommitter() throws IOException {
+
+ FlinkCommitterConverter<CommT> converter = new FlinkCommitterConverter<>();
+ Optional<SinkCommitter<CommT>> committer = sink.createCommitter();
+ return committer.map(converter::convert);
+
+ }
+
+ @Override
+ public Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException {
+ FlinkGlobalCommitterConverter<CommT, GlobalCommT> converter = new FlinkGlobalCommitterConverter<>();
+ Optional<SinkAggregatedCommitter<CommT, GlobalCommT>> committer = sink.createAggregatedCommitter();
+ return committer.map(converter::convert);
+ }
+
+ @Override
+ public Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer() {
+ if (sink.getCommitInfoSerializer().isPresent()) {
+ return Optional.of(new FlinkSimpleVersionedSerializerConverter().convert(sink.getCommitInfoSerializer().get()));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer() {
+ if (sink.getAggregatedCommitInfoSerializer().isPresent()) {
+ return Optional.of(new FlinkSimpleVersionedSerializerConverter().convert(sink.getAggregatedCommitInfoSerializer().get()));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer() {
+ if (sink.getWriterStateSerializer().isPresent()) {
+ return Optional.of(new FlinkSimpleVersionedSerializerConverter().convert(sink.getWriterStateSerializer().get()));
+ } else {
+ return Optional.empty();
+ }
+ }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
similarity index 63%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
index 2dff2525..02c2dda3 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
@@ -15,16 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.sink;
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.Sink;
+import org.apache.seatunnel.translation.sink.SinkConverter;
-public interface SinkCommitter<CommitInfoT> {
+import java.util.Map;
- List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
+public class FlinkSinkConverter implements SinkConverter<org.apache.flink.api.connector.sink.Sink> {
- List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+ @Override
+ public org.apache.flink.api.connector.sink.Sink convert(Sink<?, ?, ?, ?> sink, Map<String, String> configuration) {
- void abort() throws Exception;
+ return new FlinkSink(sink, configuration);
+
+ }
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
new file mode 100644
index 00000000..a1dc6e8a
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.flink.serialization.FlinkRowSerialization;
+
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink.SinkWriter.Context;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.util.Collections;
+import java.util.List;
+
+public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<InputT, CommT, WriterStateT> {
+
+ private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter;
+ private final FlinkRowSerialization rowSerialization = new FlinkRowSerialization();
+
+ FlinkSinkWriter(org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter) {
+ this.sinkWriter = sinkWriter;
+ }
+
+ @Override
+ public void write(InputT element, Context context) throws IOException {
+ if (element instanceof Row) {
+ sinkWriter.write(rowSerialization.deserialize((Row) element));
+ } else {
+ throw new InvalidClassException("only support Flink Row at now, the element Class is " + element.getClass());
+ }
+ }
+
+ @Override
+ public List<CommT> prepareCommit(boolean flush) throws IOException {
+ return Collections.singletonList(sinkWriter.prepareCommit());
+ }
+
+ @Override
+ public List<WriterStateT> snapshotState() throws IOException {
+ return sinkWriter.snapshotState();
+ }
+
+ @Override
+ public void close() throws Exception {
+ sinkWriter.close();
+ }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
similarity index 59%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
index 1f38efa3..9520b27d 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.flink.sink;
-import java.io.IOException;
-import java.util.List;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.translation.sink.SinkWriterConverter;
-public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+public class FlinkSinkWriterConverter<InputT, CommT, WriterStateT> implements SinkWriterConverter<org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT>> {
- List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
- throws IOException, InterruptedException;
-
- void abort() throws Exception;
+ @Override
+ public org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT> convert(SinkWriter<?, ?, ?> sinkWriter) {
+ return new FlinkSinkWriter(sinkWriter);
+ }
}