You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/16 06:56:45 UTC

[incubator-seatunnel] branch api-draft updated: [API-Draft] Add comment on common interface. (#1888)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new fbeab83a [API-Draft] Add comment on common interface. (#1888)
fbeab83a is described below

commit fbeab83a5e5560e033c9be6d007ca7937c4edba4
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Mon May 16 14:56:40 2022 +0800

    [API-Draft] Add comment on common interface. (#1888)
    
    * Add comment on common interface.
    * add serializable interface
---
 .../api/serialization/DefaultSerializer.java       |  9 +++-
 .../api/sink/DefaultSinkWriterContext.java         |  3 ++
 .../apache/seatunnel/api/sink/SeaTunnelSink.java   | 54 ++++++++++++++++++++--
 .../api/sink/SinkAggregatedCommitter.java          | 31 +++++++++++++
 .../apache/seatunnel/api/sink/SinkCommitter.java   | 17 +++++++
 .../org/apache/seatunnel/api/sink/SinkWriter.java  | 16 ++++++-
 .../api/table/factory/TableSinkFactory.java        |  1 +
 7 files changed, 125 insertions(+), 6 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
index c17a1d58..bf2c6358 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
@@ -20,13 +20,18 @@ package org.apache.seatunnel.api.serialization;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 
 import java.io.IOException;
+import java.io.InvalidClassException;
 import java.io.Serializable;
 
-public class DefaultSerializer<T extends Serializable> implements Serializer<T> {
+public class DefaultSerializer<T> implements Serializer<T> {
 
     @Override
     public byte[] serialize(T obj) throws IOException {
-        return SerializationUtils.serialize(obj);
+        if (obj instanceof Serializable) {
+            return SerializationUtils.serialize((Serializable) obj);
+        } else {
+            throw new InvalidClassException(obj.getClass() + " must implement java.io.Serializable interface");
+        }
     }
 
     @Override
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
index aeba2dd1..8c8219db 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
@@ -19,6 +19,9 @@ package org.apache.seatunnel.api.sink;
 
 import java.util.Map;
 
+/**
+ * The default {@link SinkWriter.Context} implement class.
+ */
 public class DefaultSinkWriterContext implements SinkWriter.Context {
 
     private final Map<String, String> configuration;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index cb6a4247..00427e63 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.api.sink;
 
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 
 import java.io.IOException;
@@ -24,8 +25,28 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Optional;
 
+/**
+ * The SeaTunnel sink interface, developer should implement this class when create a sink connector.
+ *
+ * @param <IN>                    The data class by sink accept. Only support
+ *                                {@link org.apache.seatunnel.api.table.type.SeaTunnelRow} at now.
+ * @param <StateT>                The state should be saved when job execute, this class should implement interface
+ *                                {@link Serializable}.
+ * @param <CommitInfoT>           The commit message class return by {@link SinkWriter#prepareCommit()}, then
+ *                                {@link SinkCommitter} or {@link SinkAggregatedCommitter} and handle it, this class should implement interface
+ *                                {@link Serializable}.
+ * @param <AggregatedCommitInfoT> The aggregated commit message class, combine by {@link CommitInfoT}.
+ *                                {@link SinkAggregatedCommitter} handle it, this class should implement interface {@link Serializable}.
+ */
 public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable {
 
+    /**
+     * This method will be called to creat {@link SinkWriter}
+     *
+     * @param context The sink context
+     * @return Return sink writer instance
+     * @throws IOException throws IOException when createWriter failed.
+     */
     SinkWriter<IN, CommitInfoT, StateT> createWriter(SinkWriter.Context context) throws IOException;
 
     default SinkWriter<IN, CommitInfoT, StateT> restoreWriter(SinkWriter.Context context,
@@ -33,23 +54,50 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> e
         return createWriter(context);
     }
 
+    /**
+     * Get {@link StateT} serializer. So that {@link StateT} can be transferred across processes
+     *
+     * @return Serializer of {@link StateT}
+     */
     default Optional<Serializer<StateT>> getWriterStateSerializer() {
-        return Optional.empty();
+        return Optional.of(new DefaultSerializer<>());
     }
 
+    /**
+     * This method will be called to create {@link SinkCommitter}
+     *
+     * @return Return sink committer instance
+     * @throws IOException throws IOException when createCommitter failed.
+     */
     default Optional<SinkCommitter<CommitInfoT>> createCommitter() throws IOException {
         return Optional.empty();
     }
 
+    /**
+     * Get {@link CommitInfoT} serializer. So that {@link CommitInfoT} can be transferred across processes
+     *
+     * @return Serializer of {@link CommitInfoT}
+     */
     default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() {
-        return Optional.empty();
+        return Optional.of(new DefaultSerializer<>());
     }
 
+    /**
+     * This method will be called to create {@link SinkAggregatedCommitter}
+     *
+     * @return Return sink aggregated committer instance
+     * @throws IOException throws IOException when createAggregatedCommitter failed.
+     */
     default Optional<SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>> createAggregatedCommitter() throws IOException {
         return Optional.empty();
     }
 
+    /**
+     * Get {@link AggregatedCommitInfoT} serializer. So that {@link AggregatedCommitInfoT} can be transferred across processes
+     *
+     * @return Serializer of {@link AggregatedCommitInfoT}
+     */
     default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() {
-        return Optional.empty();
+        return Optional.of(new DefaultSerializer<>());
     }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index b9d386a2..3cbf0314 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -21,13 +21,44 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
+/**
+ * The committer combine taskManager/Worker Commit message. Then commit it use
+ * {@link SinkAggregatedCommitter#commit(List)}. This class will execute in single thread.
+ *
+ * @param <CommitInfoT>           The type of commit message.
+ * @param <AggregatedCommitInfoT> The type of commit message after combine.
+ */
 public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> extends Serializable {
 
+    /**
+     * Commit message to third party data receiver.
+     *
+     * @param aggregatedCommitInfo The list of combine commit message.
+     * @return The commit message which need retry.
+     * @throws IOException throw IOException when commit failed.
+     */
     List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws IOException;
 
+    /**
+     * The logic about how to combine commit message.
+     *
+     * @param commitInfos The list of commit message.
+     * @return The commit message after combine.
+     */
     AggregatedCommitInfoT combine(List<CommitInfoT> commitInfos);
 
+    /**
+     * If commit failed, this method will be called.
+     *
+     * @param aggregatedCommitInfo The list of combine commit message.
+     * @throws Exception throw Exception when abort failed.
+     */
     void abort(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws Exception;
 
+    /**
+     * Close this resource.
+     *
+     * @throws IOException throw IOException when close failed.
+     */
     void close() throws IOException;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
index f734f12a..6fff7762 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -21,9 +21,26 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
+/**
+ * The committer to commit message.
+ *
+ * @param <CommitInfoT> The type of commit message.
+ */
 public interface SinkCommitter<CommitInfoT> extends Serializable {
 
+    /**
+     * Commit message to third party data receiver.
+     *
+     * @param committables The list of commit message
+     * @return The commit message need retry.
+     * @throws IOException throw IOException when commit failed.
+     */
     List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException;
 
+    /**
+     * Close this resource.
+     *
+     * @throws IOException throw IOException when close failed.
+     */
     void abort() throws IOException;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index eeaa66b7..456e78de 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -23,8 +23,22 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable{
+/**
+ * The sink writer use to write data to third party data receiver. This class will run on taskManger/Worker.
+ *
+ * @param <T>           The data class by sink accept. Only support
+ *                      {@link org.apache.seatunnel.api.table.type.SeaTunnelRow} at now.
+ * @param <CommitInfoT> The type of commit message.
+ * @param <StateT>      The type of state.
+ */
+public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
 
+    /**
+     * write data to third party data receiver.
+     *
+     * @param element the date need be written.
+     * @throws IOException throw IOException when write data failed.
+     */
     void write(T element) throws IOException;
 
     /**
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index 8d941885..b46c66c2 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -31,4 +31,5 @@ import org.apache.seatunnel.api.table.connector.TableSink;
 public interface TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Factory {
 
     TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createSink(TableFactoryContext context);
+
 }