You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/16 10:31:01 UTC

[incubator-seatunnel] branch api-draft updated: [Api-Draft] Fix Flink sink type convert error. (#1890)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new dce9ec9c [Api-Draft] Fix Flink sink type convert error. (#1890)
dce9ec9c is described below

commit dce9ec9c39010c1ba0b71d6c10204f4df9f513ea
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Mon May 16 18:30:57 2022 +0800

    [Api-Draft] Fix Flink sink type convert error. (#1890)
    
    * fix Flink sink type convert error
---
 .../translation/flink/sink/FlinkSink.java          | 62 ++++++++++------------
 .../translation/flink/sink/FlinkSinkConverter.java |  7 ++-
 2 files changed, 30 insertions(+), 39 deletions(-)

diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 5fb88060..346eae70 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -30,11 +30,14 @@ import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
-public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT, CommT, WriterStateT, GlobalCommT> {
+@SuppressWarnings("unchecked")
+public class FlinkSink<InputT, WriterStateT, CommT, GlobalCommT> implements Sink<InputT, Serializable, Serializable, Serializable> {
 
     private final SeaTunnelSink<InputT, WriterStateT, CommT, GlobalCommT> sink;
     private final Map<String, String> configuration;
@@ -46,66 +49,55 @@ public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink
     }
 
     @Override
-    public SinkWriter<InputT, CommT, WriterStateT> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List<WriterStateT> states) throws IOException {
+    public SinkWriter<InputT, Serializable, Serializable> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List<Serializable> states) throws IOException {
         // TODO add subtask and parallelism.
         org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
-            new DefaultSinkWriterContext(configuration, 0, 0);
+                new DefaultSinkWriterContext(configuration, 0, 0);
 
-        FlinkSinkWriterConverter<InputT, CommT, WriterStateT> converter = new FlinkSinkWriterConverter<>();
+        FlinkSinkWriterConverter<InputT, Serializable, Serializable> converter = new FlinkSinkWriterConverter<>();
 
         if (states == null || states.isEmpty()) {
             return converter.convert(sink.createWriter(stContext));
         } else {
-            return converter.convert(sink.restoreWriter(stContext, states));
+            return converter.convert(sink.restoreWriter(stContext, states.stream().map(s -> (WriterStateT) s).collect(Collectors.toList())));
         }
     }
 
     @Override
-    public Optional<Committer<CommT>> createCommitter() throws IOException {
+    public Optional<Committer<Serializable>> createCommitter() throws IOException {
 
-        FlinkCommitterConverter<CommT> converter = new FlinkCommitterConverter<>();
+        FlinkCommitterConverter<Serializable> converter = new FlinkCommitterConverter<>();
         Optional<SinkCommitter<CommT>> committer = sink.createCommitter();
-        return committer.map(converter::convert);
+        return committer.map(sinkCommitter -> converter.convert((SinkCommitter<Serializable>) sinkCommitter));
 
     }
 
     @Override
-    public Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException {
-        FlinkGlobalCommitterConverter<CommT, GlobalCommT> converter = new FlinkGlobalCommitterConverter<>();
+    public Optional<GlobalCommitter<Serializable, Serializable>> createGlobalCommitter() throws IOException {
+        FlinkGlobalCommitterConverter<Serializable, Serializable> converter = new FlinkGlobalCommitterConverter<>();
         Optional<SinkAggregatedCommitter<CommT, GlobalCommT>> committer = sink.createAggregatedCommitter();
-        return committer.map(converter::convert);
+        return committer.map(commTGlobalCommTSinkAggregatedCommitter -> converter.convert((SinkAggregatedCommitter<Serializable,
+                Serializable>) commTGlobalCommTSinkAggregatedCommitter));
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer() {
-        if (sink.getCommitInfoSerializer().isPresent()) {
-            final FlinkSimpleVersionedSerializerConverter<CommT> converter = new FlinkSimpleVersionedSerializerConverter<>();
-            final Serializer<CommT> commTSerializer = sink.getCommitInfoSerializer().get();
-            return Optional.of(converter.convert(commTSerializer));
-        } else {
-            return Optional.empty();
-        }
+    public Optional<SimpleVersionedSerializer<Serializable>> getCommittableSerializer() {
+        final FlinkSimpleVersionedSerializerConverter<Serializable> converter = new FlinkSimpleVersionedSerializerConverter<>();
+        final Optional<Serializer<CommT>> commTSerializer = sink.getCommitInfoSerializer();
+        return commTSerializer.map(serializer -> converter.convert((Serializer<Serializable>) serializer));
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer() {
-        if (sink.getAggregatedCommitInfoSerializer().isPresent()) {
-            final Serializer<GlobalCommT> globalCommTSerializer = sink.getAggregatedCommitInfoSerializer().get();
-            final FlinkSimpleVersionedSerializerConverter<GlobalCommT> converter = new FlinkSimpleVersionedSerializerConverter<>();
-            return Optional.of(converter.convert(globalCommTSerializer));
-        } else {
-            return Optional.empty();
-        }
+    public Optional<SimpleVersionedSerializer<Serializable>> getGlobalCommittableSerializer() {
+        final Optional<Serializer<GlobalCommT>> globalCommTSerializer = sink.getAggregatedCommitInfoSerializer();
+        final FlinkSimpleVersionedSerializerConverter<Serializable> converter = new FlinkSimpleVersionedSerializerConverter<>();
+        return globalCommTSerializer.map(serializer -> converter.convert((Serializer<Serializable>) serializer));
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer() {
-        if (sink.getWriterStateSerializer().isPresent()) {
-            final Serializer<WriterStateT> writerStateTSerializer = sink.getWriterStateSerializer().get();
-            final FlinkSimpleVersionedSerializerConverter<WriterStateT> converter = new FlinkSimpleVersionedSerializerConverter<>();
-            return Optional.of(converter.convert(writerStateTSerializer));
-        } else {
-            return Optional.empty();
-        }
+    public Optional<SimpleVersionedSerializer<Serializable>> getWriterStateSerializer() {
+        final FlinkSimpleVersionedSerializerConverter<Serializable> converter = new FlinkSimpleVersionedSerializerConverter<>();
+        final Optional<Serializer<WriterStateT>> writerStateTSerializer = sink.getWriterStateSerializer();
+        return writerStateTSerializer.map(serializer -> converter.convert((Serializer<Serializable>) serializer));
     }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
index 2456b9fe..61e7f67f 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
@@ -25,14 +25,13 @@ import org.apache.flink.api.connector.sink.Sink;
 import java.util.Map;
 
 public class FlinkSinkConverter<SeaTunnelRowT, FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>
-    implements SinkConverter<
-    SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT>,
-    Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>> {
+        implements SinkConverter<SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT,
+        AggregatedCommitInfoT>, Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>> {
 
     @Override
     @SuppressWarnings("unchecked")
     public Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT> convert(
-        SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT> sink, Map<String, String> configuration) {
+            SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT> sink, Map<String, String> configuration) {
         return (Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>) new FlinkSink<>(sink, configuration);
 
     }