You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/16 06:56:45 UTC
[incubator-seatunnel] branch api-draft updated: [API-Draft] Add comment on common interface. (#1888)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new fbeab83a [API-Draft] Add comment on common interface. (#1888)
fbeab83a is described below
commit fbeab83a5e5560e033c9be6d007ca7937c4edba4
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Mon May 16 14:56:40 2022 +0800
[API-Draft] Add comment on common interface. (#1888)
* Add comment on common interface.
* add serializable interface
---
.../api/serialization/DefaultSerializer.java | 9 +++-
.../api/sink/DefaultSinkWriterContext.java | 3 ++
.../apache/seatunnel/api/sink/SeaTunnelSink.java | 54 ++++++++++++++++++++--
.../api/sink/SinkAggregatedCommitter.java | 31 +++++++++++++
.../apache/seatunnel/api/sink/SinkCommitter.java | 17 +++++++
.../org/apache/seatunnel/api/sink/SinkWriter.java | 16 ++++++-
.../api/table/factory/TableSinkFactory.java | 1 +
7 files changed, 125 insertions(+), 6 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
index c17a1d58..bf2c6358 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
@@ -20,13 +20,18 @@ package org.apache.seatunnel.api.serialization;
import org.apache.seatunnel.common.utils.SerializationUtils;
import java.io.IOException;
+import java.io.InvalidClassException;
import java.io.Serializable;
-public class DefaultSerializer<T extends Serializable> implements Serializer<T> {
+public class DefaultSerializer<T> implements Serializer<T> {
@Override
public byte[] serialize(T obj) throws IOException {
- return SerializationUtils.serialize(obj);
+ if (obj instanceof Serializable) {
+ return SerializationUtils.serialize((Serializable) obj);
+ } else {
+ throw new InvalidClassException(obj.getClass() + " must implement java.io.Serializable interface");
+ }
}
@Override
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
index aeba2dd1..8c8219db 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSinkWriterContext.java
@@ -19,6 +19,9 @@ package org.apache.seatunnel.api.sink;
import java.util.Map;
+/**
+ * The default {@link SinkWriter.Context} implement class.
+ */
public class DefaultSinkWriterContext implements SinkWriter.Context {
private final Map<String, String> configuration;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index cb6a4247..00427e63 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.api.sink;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import java.io.IOException;
@@ -24,8 +25,28 @@ import java.io.Serializable;
import java.util.List;
import java.util.Optional;
+/**
+ * The SeaTunnel sink interface, developer should implement this class when create a sink connector.
+ *
+ * @param <IN> The data class by sink accept. Only support
+ * {@link org.apache.seatunnel.api.table.type.SeaTunnelRow} at now.
+ * @param <StateT> The state should be saved when job execute, this class should implement interface
+ * {@link Serializable}.
+ * @param <CommitInfoT> The commit message class return by {@link SinkWriter#prepareCommit()}, then
+ * {@link SinkCommitter} or {@link SinkAggregatedCommitter} and handle it, this class should implement interface
+ * {@link Serializable}.
+ * @param <AggregatedCommitInfoT> The aggregated commit message class, combine by {@link CommitInfoT}.
+ * {@link SinkAggregatedCommitter} handle it, this class should implement interface {@link Serializable}.
+ */
public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable {
+ /**
+ * This method will be called to creat {@link SinkWriter}
+ *
+ * @param context The sink context
+ * @return Return sink writer instance
+ * @throws IOException throws IOException when createWriter failed.
+ */
SinkWriter<IN, CommitInfoT, StateT> createWriter(SinkWriter.Context context) throws IOException;
default SinkWriter<IN, CommitInfoT, StateT> restoreWriter(SinkWriter.Context context,
@@ -33,23 +54,50 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> e
return createWriter(context);
}
+ /**
+ * Get {@link StateT} serializer. So that {@link StateT} can be transferred across processes
+ *
+ * @return Serializer of {@link StateT}
+ */
default Optional<Serializer<StateT>> getWriterStateSerializer() {
- return Optional.empty();
+ return Optional.of(new DefaultSerializer<>());
}
+ /**
+ * This method will be called to create {@link SinkCommitter}
+ *
+ * @return Return sink committer instance
+ * @throws IOException throws IOException when createCommitter failed.
+ */
default Optional<SinkCommitter<CommitInfoT>> createCommitter() throws IOException {
return Optional.empty();
}
+ /**
+ * Get {@link CommitInfoT} serializer. So that {@link CommitInfoT} can be transferred across processes
+ *
+ * @return Serializer of {@link CommitInfoT}
+ */
default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() {
- return Optional.empty();
+ return Optional.of(new DefaultSerializer<>());
}
+ /**
+ * This method will be called to create {@link SinkAggregatedCommitter}
+ *
+ * @return Return sink aggregated committer instance
+ * @throws IOException throws IOException when createAggregatedCommitter failed.
+ */
default Optional<SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>> createAggregatedCommitter() throws IOException {
return Optional.empty();
}
+ /**
+ * Get {@link AggregatedCommitInfoT} serializer. So that {@link AggregatedCommitInfoT} can be transferred across processes
+ *
+ * @return Serializer of {@link AggregatedCommitInfoT}
+ */
default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() {
- return Optional.empty();
+ return Optional.of(new DefaultSerializer<>());
}
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index b9d386a2..3cbf0314 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -21,13 +21,44 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+/**
+ * The committer combine taskManager/Worker Commit message. Then commit it use
+ * {@link SinkAggregatedCommitter#commit(List)}. This class will execute in single thread.
+ *
+ * @param <CommitInfoT> The type of commit message.
+ * @param <AggregatedCommitInfoT> The type of commit message after combine.
+ */
public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> extends Serializable {
+ /**
+ * Commit message to third party data receiver.
+ *
+ * @param aggregatedCommitInfo The list of combine commit message.
+ * @return The commit message which need retry.
+ * @throws IOException throw IOException when commit failed.
+ */
List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws IOException;
+ /**
+ * The logic about how to combine commit message.
+ *
+ * @param commitInfos The list of commit message.
+ * @return The commit message after combine.
+ */
AggregatedCommitInfoT combine(List<CommitInfoT> commitInfos);
+ /**
+ * If commit failed, this method will be called.
+ *
+ * @param aggregatedCommitInfo The list of combine commit message.
+ * @throws Exception throw Exception when abort failed.
+ */
void abort(List<AggregatedCommitInfoT> aggregatedCommitInfo) throws Exception;
+ /**
+ * Close this resource.
+ *
+ * @throws IOException throw IOException when close failed.
+ */
void close() throws IOException;
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
index f734f12a..6fff7762 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -21,9 +21,26 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+/**
+ * The committer to commit message.
+ *
+ * @param <CommitInfoT> The type of commit message.
+ */
public interface SinkCommitter<CommitInfoT> extends Serializable {
+ /**
+ * Commit message to third party data receiver.
+ *
+ * @param committables The list of commit message
+ * @return The commit message need retry.
+ * @throws IOException throw IOException when commit failed.
+ */
List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException;
+ /**
+ * Close this resource.
+ *
+ * @throws IOException throw IOException when close failed.
+ */
void abort() throws IOException;
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index eeaa66b7..456e78de 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -23,8 +23,22 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable{
+/**
+ * The sink writer use to write data to third party data receiver. This class will run on taskManger/Worker.
+ *
+ * @param <T> The data class by sink accept. Only support
+ * {@link org.apache.seatunnel.api.table.type.SeaTunnelRow} at now.
+ * @param <CommitInfoT> The type of commit message.
+ * @param <StateT> The type of state.
+ */
+public interface SinkWriter<T, CommitInfoT, StateT> extends Serializable {
+ /**
+ * write data to third party data receiver.
+ *
+ * @param element the date need be written.
+ * @throws IOException throw IOException when write data failed.
+ */
void write(T element) throws IOException;
/**
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index 8d941885..b46c66c2 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -31,4 +31,5 @@ import org.apache.seatunnel.api.table.connector.TableSink;
public interface TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Factory {
TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createSink(TableFactoryContext context);
+
}