You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/09 08:02:27 UTC
[incubator-seatunnel] branch api-draft updated: [Api-draft] add jobId and checkpointId (#1998)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 805f88cb [Api-draft] add jobId and checkpointId (#1998)
805f88cb is described below
commit 805f88cbc0364b3b1322b242e0720e91d1797c7a
Author: Hisoka <10...@qq.com>
AuthorDate: Thu Jun 9 16:02:21 2022 +0800
[Api-draft] add jobId and checkpointId (#1998)
* add jobId in SeaTunnelContext
* add checkpointId in sink
---
.../seatunnel/api/common/SeaTunnelContext.java | 11 ++++++--
.../org/apache/seatunnel/api/sink/SinkWriter.java | 4 +--
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 2 +-
.../translation/flink/sink/FlinkSink.java | 20 ++++++++------
.../translation/flink/sink/FlinkSinkWriter.java | 15 ++++++++---
.../flink/sink/FlinkSinkWriterConverter.java | 12 ++++++---
...kWriterConverter.java => FlinkWriterState.java} | 31 +++++++++++++++++-----
.../translation/spark/sink/SparkDataWriter.java | 5 +++-
.../spark/sink/SparkDataWriterFactory.java | 4 +--
9 files changed, 75 insertions(+), 29 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
index 60b86a9a..b6152e44 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.common.constants.JobMode;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -36,10 +37,12 @@ public final class SeaTunnelContext implements Serializable {
private static final SeaTunnelContext INSTANCE = new SeaTunnelContext();
// tableName -> tableSchema
- private Map<String, TableSchema> tableSchemaMap = new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+ private final Map<String, TableSchema> tableSchemaMap = new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
private JobMode jobMode;
+ private final String jobId;
+
public static SeaTunnelContext getContext() {
return INSTANCE;
}
@@ -73,8 +76,12 @@ public final class SeaTunnelContext implements Serializable {
return jobMode;
}
+ public String getJobId() {
+ return this.jobId;
+ }
+
private SeaTunnelContext() {
- // no-op
+ this.jobId = UUID.randomUUID().toString().replace("-", "");
}
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 6ce5a79a..4e6a6d99 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -43,7 +43,7 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
void write(T element) throws IOException;
/**
- * prepare the commit, will be called before {@link #snapshotState()}.
+ * prepare the commit, will be called before {@link #snapshotState(long checkpointId)}.
* If you need to use 2pc, you can return the commit info in this method, and receive the commit info in {@link SinkCommitter#commit(List)}.
*
* @return the commit info need to commit
@@ -54,7 +54,7 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
* @return The writer's state.
* @throws IOException if fail to snapshot writer's state.
*/
- default List<StateT> snapshotState() throws IOException {
+ default List<StateT> snapshotState(long checkpointId) throws IOException {
return Collections.emptyList();
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 4f222a9d..d7f1d5e6 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -77,7 +77,7 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
@Override
- public List<KafkaSinkState> snapshotState() {
+ public List<KafkaSinkState> snapshotState(long checkpointId) {
return kafkaProducerSender.snapshotState();
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 346eae70..0218fb7a 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -37,7 +37,8 @@ import java.util.Optional;
import java.util.stream.Collectors;
@SuppressWarnings("unchecked")
-public class FlinkSink<InputT, WriterStateT, CommT, GlobalCommT> implements Sink<InputT, Serializable, Serializable, Serializable> {
+public class FlinkSink<InputT, WriterStateT, CommT, GlobalCommT> implements Sink<InputT, Serializable,
+ FlinkWriterState<WriterStateT>, Serializable> {
private final SeaTunnelSink<InputT, WriterStateT, CommT, GlobalCommT> sink;
private final Map<String, String> configuration;
@@ -49,17 +50,20 @@ public class FlinkSink<InputT, WriterStateT, CommT, GlobalCommT> implements Sink
}
@Override
- public SinkWriter<InputT, Serializable, Serializable> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List<Serializable> states) throws IOException {
+ public SinkWriter<InputT, Serializable, FlinkWriterState<WriterStateT>> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List<FlinkWriterState<WriterStateT>> states) throws IOException {
// TODO add subtask and parallelism.
org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
new DefaultSinkWriterContext(configuration, 0, 0);
- FlinkSinkWriterConverter<InputT, Serializable, Serializable> converter = new FlinkSinkWriterConverter<>();
-
if (states == null || states.isEmpty()) {
+ FlinkSinkWriterConverter<InputT, Serializable, WriterStateT> converter =
+ new FlinkSinkWriterConverter<>(1);
return converter.convert(sink.createWriter(stContext));
} else {
- return converter.convert(sink.restoreWriter(stContext, states.stream().map(s -> (WriterStateT) s).collect(Collectors.toList())));
+ FlinkSinkWriterConverter<InputT, Serializable, WriterStateT> converter =
+ new FlinkSinkWriterConverter<>(states.get(0).getCheckpointId());
+ return converter.convert(sink.restoreWriter(stContext,
+ states.stream().map(FlinkWriterState::getState).collect(Collectors.toList())));
}
}
@@ -95,9 +99,9 @@ public class FlinkSink<InputT, WriterStateT, CommT, GlobalCommT> implements Sink
}
@Override
- public Optional<SimpleVersionedSerializer<Serializable>> getWriterStateSerializer() {
- final FlinkSimpleVersionedSerializerConverter<Serializable> converter = new FlinkSimpleVersionedSerializerConverter<>();
+ public Optional<SimpleVersionedSerializer<FlinkWriterState<WriterStateT>>> getWriterStateSerializer() {
+ final FlinkSimpleVersionedSerializerConverter<FlinkWriterState<WriterStateT>> converter = new FlinkSimpleVersionedSerializerConverter<>();
final Optional<Serializer<WriterStateT>> writerStateTSerializer = sink.getWriterStateSerializer();
- return writerStateTSerializer.map(serializer -> converter.convert((Serializer<Serializable>) serializer));
+ return writerStateTSerializer.map(serializer -> converter.convert((Serializer<FlinkWriterState<WriterStateT>>) serializer));
}
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 9f4f1a7b..38ba20c2 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -29,14 +29,18 @@ import java.io.InvalidClassException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
-public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<InputT, CommT, WriterStateT> {
+public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<InputT, CommT, FlinkWriterState<WriterStateT>> {
private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter;
private final FlinkRowSerialization rowSerialization = new FlinkRowSerialization();
+ private long checkpointId;
- FlinkSinkWriter(org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter) {
+ FlinkSinkWriter(org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter,
+ long checkpointId) {
this.sinkWriter = sinkWriter;
+ this.checkpointId = checkpointId;
}
@Override
@@ -57,8 +61,11 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<
}
@Override
- public List<WriterStateT> snapshotState() throws IOException {
- return sinkWriter.snapshotState();
+ public List<FlinkWriterState<WriterStateT>> snapshotState() throws IOException {
+ List<FlinkWriterState<WriterStateT>> states = sinkWriter.snapshotState(this.checkpointId)
+ .stream().map(state -> new FlinkWriterState<>(this.checkpointId, state)).collect(Collectors.toList());
+ this.checkpointId++;
+ return states;
}
@Override
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
index 9520b27d..e86cd5d6 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
@@ -20,10 +20,16 @@ package org.apache.seatunnel.translation.flink.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.translation.sink.SinkWriterConverter;
-public class FlinkSinkWriterConverter<InputT, CommT, WriterStateT> implements SinkWriterConverter<org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT>> {
+public class FlinkSinkWriterConverter<InputT, CommT, WriterStateT> implements SinkWriterConverter<org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, FlinkWriterState<WriterStateT>>> {
+
+ private final long checkpointId;
+
+ FlinkSinkWriterConverter(long checkpointId) {
+ this.checkpointId = checkpointId;
+ }
@Override
- public org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT> convert(SinkWriter<?, ?, ?> sinkWriter) {
- return new FlinkSinkWriter(sinkWriter);
+ public org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, FlinkWriterState<WriterStateT>> convert(SinkWriter<?, ?, ?> sinkWriter) {
+ return new FlinkSinkWriter(sinkWriter, this.checkpointId);
}
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
similarity index 59%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
index 9520b27d..095fb07d 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
@@ -17,13 +17,32 @@
package org.apache.seatunnel.translation.flink.sink;
-import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.translation.sink.SinkWriterConverter;
+import java.io.Serializable;
-public class FlinkSinkWriterConverter<InputT, CommT, WriterStateT> implements SinkWriterConverter<org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT>> {
+public class FlinkWriterState<StateT> implements Serializable {
- @Override
- public org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT> convert(SinkWriter<?, ?, ?> sinkWriter) {
- return new FlinkSinkWriter(sinkWriter);
+ private long checkpointId = 0;
+
+ private StateT state;
+
+ public FlinkWriterState(long checkpointId, StateT state) {
+ this.checkpointId = checkpointId;
+ this.state = state;
+ }
+
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ public void setCheckpointId(long checkpointId) {
+ this.checkpointId = checkpointId;
+ }
+
+ public StateT getState() {
+ return state;
+ }
+
+ public void setState(StateT state) {
+ this.state = state;
}
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index 8aab52db..95ed6fd3 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -42,13 +42,15 @@ public class SparkDataWriter<CommitInfoT, StateT> implements DataWriter<Internal
private final SinkCommitter<CommitInfoT> sinkCommitter;
private final RowSerialization<InternalRow> rowSerialization;
private CommitInfoT latestCommitInfoT;
+ private long epochId;
SparkDataWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
@Nullable SinkCommitter<CommitInfoT> sinkCommitter,
- StructType schema) {
+ StructType schema, long epochId) {
this.sinkWriter = sinkWriter;
this.sinkCommitter = sinkCommitter;
this.rowSerialization = new InternalRowSerialization(schema);
+ this.epochId = epochId == 0 ? 1 : epochId;
}
@Override
@@ -66,6 +68,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements DataWriter<Internal
// 2.1. We have the commit info, we need to execute the sinkCommitter#abort to rollback the transaction.
Optional<CommitInfoT> commitInfoTOptional = sinkWriter.prepareCommit();
commitInfoTOptional.ifPresent(commitInfoT -> latestCommitInfoT = commitInfoT);
+ sinkWriter.snapshotState(epochId++);
if (sinkCommitter != null) {
if (latestCommitInfoT == null) {
sinkCommitter.commit(Collections.emptyList());
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
index 4c8e239c..a3654ce6 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
@@ -45,7 +45,7 @@ public class SparkDataWriterFactory<CommitInfoT, StateT> implements DataWriterFa
@Override
public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
- // TODO use partitionID, taskId, epochId information.
- return new SparkDataWriter<>(sinkWriter, sinkCommitter, schema);
+ // TODO use partitionID, taskId information.
+ return new SparkDataWriter<>(sinkWriter, sinkCommitter, schema, epochId);
}
}