You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/07/07 02:50:23 UTC
[incubator-seatunnel] branch dev updated: [Improvement][new api] refer to https://github.com/apache/incubator-seatunnel/issues/2127 (#2144)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e19660a04 [Improvement][new api] refer to https://github.com/apache/incubator-seatunnel/issues/2127 (#2144)
e19660a04 is described below
commit e19660a04961887ee1020aeb34218f3f0e97eee9
Author: Jared Li <lh...@gmail.com>
AuthorDate: Thu Jul 7 10:50:18 2022 +0800
[Improvement][new api] refer to https://github.com/apache/incubator-seatunnel/issues/2127 (#2144)
1. class name of SeaTunnelRuntimeEnvironment changes to SeaTunnelContextAware
2. Serializer doesn't need to extend Serializable
3. unify Enumerator concept
---
.../main/java/org/apache/seatunnel/api/serialization/Serializer.java | 3 +--
.../src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java | 4 ++--
.../{SeaTunnelRuntimeEnvironment.java => SeaTunnelContextAware.java} | 2 +-
.../main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java | 2 +-
.../src/main/java/org/apache/seatunnel/api/source/SourceReader.java | 2 +-
.../connectors/seatunnel/common/source/AbstractSingleSplitSource.java | 2 +-
.../apache/seatunnel/translation/source/CoordinatedReaderContext.java | 2 +-
.../apache/seatunnel/translation/source/ParallelReaderContext.java | 2 +-
8 files changed, 9 insertions(+), 10 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
index ffdd9421d..617034864 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
@@ -18,9 +18,8 @@
package org.apache.seatunnel.api.serialization;
import java.io.IOException;
-import java.io.Serializable;
-public interface Serializer<T> extends Serializable {
+public interface Serializer<T> {
/**
* Serializes the given object.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 48d8f33da..59517e409 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.api.sink;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.serialization.Serializer;
-import org.apache.seatunnel.api.source.SeaTunnelRuntimeEnvironment;
+import org.apache.seatunnel.api.source.SeaTunnelContextAware;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -43,7 +43,7 @@ import java.util.Optional;
* {@link SinkAggregatedCommitter} handle it, this class should implement interface {@link Serializable}.
*/
public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
- extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelRuntimeEnvironment {
+ extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelContextAware {
/**
* Set the row type info of sink row data. This method will be automatically called by translation.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
similarity index 95%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
index c0a51dd57..429f05155 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelContextAware.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.api.common.SeaTunnelContext;
/**
* This interface defines the runtime environment of the SeaTunnel application.
*/
-public interface SeaTunnelRuntimeEnvironment {
+public interface SeaTunnelContextAware {
default void setSeaTunnelContext(SeaTunnelContext seaTunnelContext){
// nothing
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index 3e1cb8702..ca3a9f900 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -34,7 +34,7 @@ import java.io.Serializable;
* @param <StateT> The type of checkpoint states.
*/
public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT>
- extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelRuntimeEnvironment {
+ extends Serializable, PluginIdentifierInterface, SeaTunnelPluginLifeCycle, SeaTunnelContextAware {
/**
* Get the boundedness of this source.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
index b8505902e..72b3e7eb5 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
@@ -114,6 +114,6 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab
*
* @param sourceEvent the source event to coordinator.
*/
- void sendSourceEventToCoordinator(SourceEvent sourceEvent);
+ void sendSourceEventToEnumerator(SourceEvent sourceEvent);
}
}
diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java
index f2968c833..255150d9e 100644
--- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java
@@ -29,7 +29,7 @@ public abstract class AbstractSingleSplitSource<T> implements SeaTunnelSource<T,
@Override
public final AbstractSingleSplitReader<T> createReader(SourceReader.Context readerContext) throws Exception {
- checkArgument(readerContext.getIndexOfSubtask() == 0, "Single split source allows only a single reader to be created.");
+ checkArgument(readerContext.getIndexOfSubtask() == 0, "A single split source allows only one single reader to be created.");
return createReader(new SingleSplitReaderContext(readerContext));
}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
index 6f410844b..ff888f038 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedReaderContext.java
@@ -56,7 +56,7 @@ public class CoordinatedReaderContext implements SourceReader.Context {
}
@Override
- public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
+ public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
coordinatedSource.handleReaderEvent(subtaskId, sourceEvent);
}
}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
index 24c082cf6..cbda3201c 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
@@ -56,7 +56,7 @@ public class ParallelReaderContext implements SourceReader.Context {
}
@Override
- public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
+ public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
// TODO: exception
throw new RuntimeException("");
}