You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/16 10:31:01 UTC
[incubator-seatunnel] branch api-draft updated: [Api-Draft] Fix Flink sink type convert error. (#1890)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new dce9ec9c [Api-Draft] Fix Flink sink type convert error. (#1890)
dce9ec9c is described below
commit dce9ec9c39010c1ba0b71d6c10204f4df9f513ea
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Mon May 16 18:30:57 2022 +0800
[Api-Draft] Fix Flink sink type convert error. (#1890)
* fix Flink sink type convert error
---
.../translation/flink/sink/FlinkSink.java | 62 ++++++++++------------
.../translation/flink/sink/FlinkSinkConverter.java | 7 ++-
2 files changed, 30 insertions(+), 39 deletions(-)
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 5fb88060..346eae70 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -30,11 +30,14 @@ import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
-public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT, CommT, WriterStateT, GlobalCommT> {
+@SuppressWarnings("unchecked")
+public class FlinkSink<InputT, WriterStateT, CommT, GlobalCommT> implements Sink<InputT, Serializable, Serializable, Serializable> {
private final SeaTunnelSink<InputT, WriterStateT, CommT, GlobalCommT> sink;
private final Map<String, String> configuration;
@@ -46,66 +49,55 @@ public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink
}
@Override
- public SinkWriter<InputT, CommT, WriterStateT> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List<WriterStateT> states) throws IOException {
+ public SinkWriter<InputT, Serializable, Serializable> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List<Serializable> states) throws IOException {
// TODO add subtask and parallelism.
org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
- new DefaultSinkWriterContext(configuration, 0, 0);
+ new DefaultSinkWriterContext(configuration, 0, 0);
- FlinkSinkWriterConverter<InputT, CommT, WriterStateT> converter = new FlinkSinkWriterConverter<>();
+ FlinkSinkWriterConverter<InputT, Serializable, Serializable> converter = new FlinkSinkWriterConverter<>();
if (states == null || states.isEmpty()) {
return converter.convert(sink.createWriter(stContext));
} else {
- return converter.convert(sink.restoreWriter(stContext, states));
+ return converter.convert(sink.restoreWriter(stContext, states.stream().map(s -> (WriterStateT) s).collect(Collectors.toList())));
}
}
@Override
- public Optional<Committer<CommT>> createCommitter() throws IOException {
+ public Optional<Committer<Serializable>> createCommitter() throws IOException {
- FlinkCommitterConverter<CommT> converter = new FlinkCommitterConverter<>();
+ FlinkCommitterConverter<Serializable> converter = new FlinkCommitterConverter<>();
Optional<SinkCommitter<CommT>> committer = sink.createCommitter();
- return committer.map(converter::convert);
+ return committer.map(sinkCommitter -> converter.convert((SinkCommitter<Serializable>) sinkCommitter));
}
@Override
- public Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException {
- FlinkGlobalCommitterConverter<CommT, GlobalCommT> converter = new FlinkGlobalCommitterConverter<>();
+ public Optional<GlobalCommitter<Serializable, Serializable>> createGlobalCommitter() throws IOException {
+ FlinkGlobalCommitterConverter<Serializable, Serializable> converter = new FlinkGlobalCommitterConverter<>();
Optional<SinkAggregatedCommitter<CommT, GlobalCommT>> committer = sink.createAggregatedCommitter();
- return committer.map(converter::convert);
+ return committer.map(commTGlobalCommTSinkAggregatedCommitter -> converter.convert((SinkAggregatedCommitter<Serializable,
+ Serializable>) commTGlobalCommTSinkAggregatedCommitter));
}
@Override
- public Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer() {
- if (sink.getCommitInfoSerializer().isPresent()) {
- final FlinkSimpleVersionedSerializerConverter<CommT> converter = new FlinkSimpleVersionedSerializerConverter<>();
- final Serializer<CommT> commTSerializer = sink.getCommitInfoSerializer().get();
- return Optional.of(converter.convert(commTSerializer));
- } else {
- return Optional.empty();
- }
+ public Optional<SimpleVersionedSerializer<Serializable>> getCommittableSerializer() {
+ final FlinkSimpleVersionedSerializerConverter<Serializable> converter = new FlinkSimpleVersionedSerializerConverter<>();
+ final Optional<Serializer<CommT>> commTSerializer = sink.getCommitInfoSerializer();
+ return commTSerializer.map(serializer -> converter.convert((Serializer<Serializable>) serializer));
}
@Override
- public Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer() {
- if (sink.getAggregatedCommitInfoSerializer().isPresent()) {
- final Serializer<GlobalCommT> globalCommTSerializer = sink.getAggregatedCommitInfoSerializer().get();
- final FlinkSimpleVersionedSerializerConverter<GlobalCommT> converter = new FlinkSimpleVersionedSerializerConverter<>();
- return Optional.of(converter.convert(globalCommTSerializer));
- } else {
- return Optional.empty();
- }
+ public Optional<SimpleVersionedSerializer<Serializable>> getGlobalCommittableSerializer() {
+ final Optional<Serializer<GlobalCommT>> globalCommTSerializer = sink.getAggregatedCommitInfoSerializer();
+ final FlinkSimpleVersionedSerializerConverter<Serializable> converter = new FlinkSimpleVersionedSerializerConverter<>();
+ return globalCommTSerializer.map(serializer -> converter.convert((Serializer<Serializable>) serializer));
}
@Override
- public Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer() {
- if (sink.getWriterStateSerializer().isPresent()) {
- final Serializer<WriterStateT> writerStateTSerializer = sink.getWriterStateSerializer().get();
- final FlinkSimpleVersionedSerializerConverter<WriterStateT> converter = new FlinkSimpleVersionedSerializerConverter<>();
- return Optional.of(converter.convert(writerStateTSerializer));
- } else {
- return Optional.empty();
- }
+ public Optional<SimpleVersionedSerializer<Serializable>> getWriterStateSerializer() {
+ final FlinkSimpleVersionedSerializerConverter<Serializable> converter = new FlinkSimpleVersionedSerializerConverter<>();
+ final Optional<Serializer<WriterStateT>> writerStateTSerializer = sink.getWriterStateSerializer();
+ return writerStateTSerializer.map(serializer -> converter.convert((Serializer<Serializable>) serializer));
}
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
index 2456b9fe..61e7f67f 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkConverter.java
@@ -25,14 +25,13 @@ import org.apache.flink.api.connector.sink.Sink;
import java.util.Map;
public class FlinkSinkConverter<SeaTunnelRowT, FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>
- implements SinkConverter<
- SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT>,
- Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>> {
+ implements SinkConverter<SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT,
+ AggregatedCommitInfoT>, Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>> {
@Override
@SuppressWarnings("unchecked")
public Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT> convert(
- SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT> sink, Map<String, String> configuration) {
+ SeaTunnelSink<SeaTunnelRowT, StateT, CommitInfoT, AggregatedCommitInfoT> sink, Map<String, String> configuration) {
return (Sink<FlinkRowT, StateT, CommitInfoT, AggregatedCommitInfoT>) new FlinkSink<>(sink, configuration);
}