You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/09 08:02:27 UTC

[incubator-seatunnel] branch api-draft updated: [Api-draft] add jobId and checkpointId (#1998)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 805f88cb [Api-draft] add jobId and checkpointId (#1998)
805f88cb is described below

commit 805f88cbc0364b3b1322b242e0720e91d1797c7a
Author: Hisoka <10...@qq.com>
AuthorDate: Thu Jun 9 16:02:21 2022 +0800

    [Api-draft] add jobId and checkpointId (#1998)
    
    * add jobId in SeaTunnelContext
    
    * add checkpointId in sink
---
 .../seatunnel/api/common/SeaTunnelContext.java     | 11 ++++++--
 .../org/apache/seatunnel/api/sink/SinkWriter.java  |  4 +--
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      |  2 +-
 .../translation/flink/sink/FlinkSink.java          | 20 ++++++++------
 .../translation/flink/sink/FlinkSinkWriter.java    | 15 ++++++++---
 .../flink/sink/FlinkSinkWriterConverter.java       | 12 ++++++---
 ...kWriterConverter.java => FlinkWriterState.java} | 31 +++++++++++++++++-----
 .../translation/spark/sink/SparkDataWriter.java    |  5 +++-
 .../spark/sink/SparkDataWriterFactory.java         |  4 +--
 9 files changed, 75 insertions(+), 29 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
index 60b86a9a..b6152e44 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelContext.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.common.constants.JobMode;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -36,10 +37,12 @@ public final class SeaTunnelContext implements Serializable {
     private static final SeaTunnelContext INSTANCE = new SeaTunnelContext();
 
     // tableName -> tableSchema
-    private Map<String, TableSchema> tableSchemaMap = new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+    private final Map<String, TableSchema> tableSchemaMap = new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
 
     private JobMode jobMode;
 
+    private final String jobId;
+
     public static SeaTunnelContext getContext() {
         return INSTANCE;
     }
@@ -73,8 +76,12 @@ public final class SeaTunnelContext implements Serializable {
         return jobMode;
     }
 
+    public String getJobId() {
+        return this.jobId;
+    }
+
     private SeaTunnelContext() {
-        // no-op
+        this.jobId = UUID.randomUUID().toString().replace("-", "");
     }
 
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 6ce5a79a..4e6a6d99 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -43,7 +43,7 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
     void write(T element) throws IOException;
 
     /**
-     * prepare the commit, will be called before {@link #snapshotState()}.
+     * prepare the commit, will be called before {@link #snapshotState(long checkpointId)}.
      * If you need to use 2pc, you can return the commit info in this method, and receive the commit info in {@link SinkCommitter#commit(List)}.
      *
      * @return the commit info need to commit
@@ -54,7 +54,7 @@ public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
      * @return The writer's state.
      * @throws IOException if fail to snapshot writer's state.
      */
-    default List<StateT> snapshotState() throws IOException {
+    default List<StateT> snapshotState(long checkpointId) throws IOException {
         return Collections.emptyList();
     }
 
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 4f222a9d..d7f1d5e6 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -77,7 +77,7 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
     }
 
     @Override
-    public List<KafkaSinkState> snapshotState() {
+    public List<KafkaSinkState> snapshotState(long checkpointId) {
         return kafkaProducerSender.snapshotState();
     }
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index 346eae70..0218fb7a 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -37,7 +37,8 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 @SuppressWarnings("unchecked")
-public class FlinkSink<InputT, WriterStateT, CommT, GlobalCommT> implements Sink<InputT, Serializable, Serializable, Serializable> {
+public class FlinkSink<InputT, WriterStateT, CommT, GlobalCommT> implements Sink<InputT, Serializable,
+        FlinkWriterState<WriterStateT>, Serializable> {
 
     private final SeaTunnelSink<InputT, WriterStateT, CommT, GlobalCommT> sink;
     private final Map<String, String> configuration;
@@ -49,17 +50,20 @@ public class FlinkSink<InputT, WriterStateT, CommT, GlobalCommT> implements Sink
     }
 
     @Override
-    public SinkWriter<InputT, Serializable, Serializable> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List<Serializable> states) throws IOException {
+    public SinkWriter<InputT, Serializable, FlinkWriterState<WriterStateT>> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List<FlinkWriterState<WriterStateT>> states) throws IOException {
         // TODO add subtask and parallelism.
         org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
                 new DefaultSinkWriterContext(configuration, 0, 0);
 
-        FlinkSinkWriterConverter<InputT, Serializable, Serializable> converter = new FlinkSinkWriterConverter<>();
-
         if (states == null || states.isEmpty()) {
+            FlinkSinkWriterConverter<InputT, Serializable, WriterStateT> converter =
+                    new FlinkSinkWriterConverter<>(1);
             return converter.convert(sink.createWriter(stContext));
         } else {
-            return converter.convert(sink.restoreWriter(stContext, states.stream().map(s -> (WriterStateT) s).collect(Collectors.toList())));
+            FlinkSinkWriterConverter<InputT, Serializable, WriterStateT> converter =
+                    new FlinkSinkWriterConverter<>(states.get(0).getCheckpointId());
+            return converter.convert(sink.restoreWriter(stContext,
+                    states.stream().map(FlinkWriterState::getState).collect(Collectors.toList())));
         }
     }
 
@@ -95,9 +99,9 @@ public class FlinkSink<InputT, WriterStateT, CommT, GlobalCommT> implements Sink
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<Serializable>> getWriterStateSerializer() {
-        final FlinkSimpleVersionedSerializerConverter<Serializable> converter = new FlinkSimpleVersionedSerializerConverter<>();
+    public Optional<SimpleVersionedSerializer<FlinkWriterState<WriterStateT>>> getWriterStateSerializer() {
+        final FlinkSimpleVersionedSerializerConverter<FlinkWriterState<WriterStateT>> converter = new FlinkSimpleVersionedSerializerConverter<>();
         final Optional<Serializer<WriterStateT>> writerStateTSerializer = sink.getWriterStateSerializer();
-        return writerStateTSerializer.map(serializer -> converter.convert((Serializer<Serializable>) serializer));
+        return writerStateTSerializer.map(serializer -> converter.convert((Serializer<FlinkWriterState<WriterStateT>>) serializer));
     }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 9f4f1a7b..38ba20c2 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -29,14 +29,18 @@ import java.io.InvalidClassException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
-public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<InputT, CommT, WriterStateT> {
+public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<InputT, CommT, FlinkWriterState<WriterStateT>> {
 
     private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter;
     private final FlinkRowSerialization rowSerialization = new FlinkRowSerialization();
+    private long checkpointId;
 
-    FlinkSinkWriter(org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter) {
+    FlinkSinkWriter(org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter,
+                    long checkpointId) {
         this.sinkWriter = sinkWriter;
+        this.checkpointId = checkpointId;
     }
 
     @Override
@@ -57,8 +61,11 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<
     }
 
     @Override
-    public List<WriterStateT> snapshotState() throws IOException {
-        return sinkWriter.snapshotState();
+    public List<FlinkWriterState<WriterStateT>> snapshotState() throws IOException {
+        List<FlinkWriterState<WriterStateT>> states = sinkWriter.snapshotState(this.checkpointId)
+                .stream().map(state -> new FlinkWriterState<>(this.checkpointId, state)).collect(Collectors.toList());
+        this.checkpointId++;
+        return states;
     }
 
     @Override
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
index 9520b27d..e86cd5d6 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
@@ -20,10 +20,16 @@ package org.apache.seatunnel.translation.flink.sink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.translation.sink.SinkWriterConverter;
 
-public class FlinkSinkWriterConverter<InputT, CommT, WriterStateT> implements SinkWriterConverter<org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT>> {
+public class FlinkSinkWriterConverter<InputT, CommT, WriterStateT> implements SinkWriterConverter<org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, FlinkWriterState<WriterStateT>>> {
+
+    private final long checkpointId;
+
+    FlinkSinkWriterConverter(long checkpointId) {
+        this.checkpointId = checkpointId;
+    }
 
     @Override
-    public org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT> convert(SinkWriter<?, ?, ?> sinkWriter) {
-        return new FlinkSinkWriter(sinkWriter);
+    public org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, FlinkWriterState<WriterStateT>> convert(SinkWriter<?, ?, ?> sinkWriter) {
+        return new FlinkSinkWriter(sinkWriter, this.checkpointId);
     }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
similarity index 59%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
index 9520b27d..095fb07d 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
@@ -17,13 +17,32 @@
 
 package org.apache.seatunnel.translation.flink.sink;
 
-import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.translation.sink.SinkWriterConverter;
+import java.io.Serializable;
 
-public class FlinkSinkWriterConverter<InputT, CommT, WriterStateT> implements SinkWriterConverter<org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT>> {
+public class FlinkWriterState<StateT> implements Serializable {
 
-    @Override
-    public org.apache.flink.api.connector.sink.SinkWriter<InputT, CommT, WriterStateT> convert(SinkWriter<?, ?, ?> sinkWriter) {
-        return new FlinkSinkWriter(sinkWriter);
+    private long checkpointId = 0;
+
+    private StateT state;
+
+    public FlinkWriterState(long checkpointId, StateT state) {
+        this.checkpointId = checkpointId;
+        this.state = state;
+    }
+
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    public void setCheckpointId(long checkpointId) {
+        this.checkpointId = checkpointId;
+    }
+
+    public StateT getState() {
+        return state;
+    }
+
+    public void setState(StateT state) {
+        this.state = state;
     }
 }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index 8aab52db..95ed6fd3 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -42,13 +42,15 @@ public class SparkDataWriter<CommitInfoT, StateT> implements DataWriter<Internal
     private final SinkCommitter<CommitInfoT> sinkCommitter;
     private final RowSerialization<InternalRow> rowSerialization;
     private CommitInfoT latestCommitInfoT;
+    private long epochId;
 
     SparkDataWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
                     @Nullable SinkCommitter<CommitInfoT> sinkCommitter,
-                    StructType schema) {
+                    StructType schema, long epochId) {
         this.sinkWriter = sinkWriter;
         this.sinkCommitter = sinkCommitter;
         this.rowSerialization = new InternalRowSerialization(schema);
+        this.epochId = epochId == 0 ? 1 : epochId;
     }
 
     @Override
@@ -66,6 +68,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements DataWriter<Internal
         //   2.1. We have the commit info, we need to execute the sinkCommitter#abort to rollback the transaction.
         Optional<CommitInfoT> commitInfoTOptional = sinkWriter.prepareCommit();
         commitInfoTOptional.ifPresent(commitInfoT -> latestCommitInfoT = commitInfoT);
+        sinkWriter.snapshotState(epochId++);
         if (sinkCommitter != null) {
             if (latestCommitInfoT == null) {
                 sinkCommitter.commit(Collections.emptyList());
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
index 4c8e239c..a3654ce6 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriterFactory.java
@@ -45,7 +45,7 @@ public class SparkDataWriterFactory<CommitInfoT, StateT> implements DataWriterFa
 
     @Override
     public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
-        // TODO use partitionID, taskId, epochId information.
-        return new SparkDataWriter<>(sinkWriter, sinkCommitter, schema);
+        // TODO use partitionID, taskId information.
+        return new SparkDataWriter<>(sinkWriter, sinkCommitter, schema, epochId);
     }
 }