You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/27 05:58:09 UTC
[incubator-seatunnel] branch api-draft updated: [api-draft][connector] Add simplified connector api (#2041)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new a7357b6a9 [api-draft][connector] Add simplified connector api (#2041)
a7357b6a9 is described below
commit a7357b6a9bf6f9adc7986b21638ae1bb62eca308
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Mon Jun 27 13:58:05 2022 +0800
[api-draft][connector] Add simplified connector api (#2041)
---
.../api/source/SeaTunnelRuntimeEnvironment.java | 11 +--
.../seatunnel/api/source/SeaTunnelSource.java | 6 +-
.../seatunnel-connectors-seatunnel/pom.xml | 1 +
.../clickhouse/source/ClickhouseSource.java | 11 ---
.../pom.xml | 7 +-
.../seatunnel/common/sink/AbstractSimpleSink.java | 59 ++++++++++++++++
.../seatunnel/common/sink/AbstractSinkWriter.java} | 18 ++---
.../common/source/AbstractSingleSplitReader.java | 67 +++++++++++++++++++
.../common/source/AbstractSingleSplitSource.java | 58 ++++++++++++++++
.../seatunnel/common/source/SingleSplit.java} | 15 +++--
.../common/source/SingleSplitEnumerator.java} | 51 ++++++++------
.../common/source/SingleSplitEnumeratorState.java} | 5 +-
.../common/source/SingleSplitReaderContext.java} | 24 +++----
.../seatunnel-connector-seatunnel-console/pom.xml | 2 +-
.../console/sink/ConsoleAggregatedCommitInfo.java | 21 ------
.../seatunnel/console/sink/ConsoleCommitInfo.java | 21 ------
.../seatunnel/console/sink/ConsoleSink.java | 26 ++------
.../seatunnel/console/sink/ConsoleSinkWriter.java | 18 +----
.../seatunnel-connector-seatunnel-fake/pom.xml | 3 +-
.../seatunnel/fake/source/FakeSource.java | 42 ++++--------
.../seatunnel/fake/source/FakeSourceEvent.java | 45 -------------
.../seatunnel/fake/source/FakeSourceReader.java | 34 ++--------
.../fake/source/FakeSourceSplitEnumerator.java | 78 ----------------------
.../fake/source/FakeSupportCoordinate.java | 23 -------
.../connectors/seatunnel/fake/state/FakeState.java | 23 -------
.../connectors/seatunnel/hive/sink/HiveSink.java | 11 ---
.../seatunnel-connector-seatunnel-http/pom.xml | 2 +-
.../seatunnel/http/source/HttpSource.java | 40 ++++-------
.../seatunnel/http/source/HttpSourceReader.java | 30 ++-------
.../connectors/seatunnel/http/state/HttpState.java | 24 -------
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 5 --
.../connectors/seatunnel/kafka/sink/KafkaSink.java | 12 ----
.../seatunnel/kafka/source/KafkaSource.java | 12 ++--
.../seatunnel/pulsar/source/PulsarSource.java | 12 ----
.../seatunnel-connector-seatunnel-socket/pom.xml | 3 +-
.../seatunnel/socket/source/SocketSource.java | 40 ++++-------
.../socket/source/SocketSourceReader.java | 31 ++-------
.../socket/source/SocketSourceSplitEnumerator.java | 77 ---------------------
.../batch/CoordinatedBatchPartitionReader.java | 2 +-
.../CoordinatedMicroBatchPartitionReader.java | 2 +-
40 files changed, 333 insertions(+), 639 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
index 166306369..c0a51dd57 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
@@ -24,12 +24,7 @@ import org.apache.seatunnel.api.common.SeaTunnelContext;
*/
public interface SeaTunnelRuntimeEnvironment {
- /**
- * Returns the SeaTunnel runtime context.
- *
- * @return seaTunnelContext
- */
- SeaTunnelContext getSeaTunnelContext();
-
- void setSeaTunnelContext(SeaTunnelContext seaTunnelContext);
+ default void setSeaTunnelContext(SeaTunnelContext seaTunnelContext){
+ // nothing
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index b37a36002..3e1cb8702 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.common.constants.JobMode;
import java.io.Serializable;
@@ -42,10 +41,7 @@ public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT>
*
* @return the boundedness of this source.
*/
- default Boundedness getBoundedness() {
- return JobMode.BATCH.equals(getSeaTunnelContext().getJobMode()) ?
- Boundedness.BOUNDED : Boundedness.UNBOUNDED;
- }
+ Boundedness getBoundedness();
/**
* Get the data type of the records produced by this source.
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index b4a9ac77f..6b94005e3 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -31,6 +31,7 @@
<artifactId>seatunnel-connectors-seatunnel</artifactId>
<modules>
+ <module>seatunnel-connector-seatunnel-common</module>
<module>seatunnel-connector-seatunnel-console</module>
<module>seatunnel-connector-seatunnel-fake</module>
<module>seatunnel-connector-seatunnel-kafka</module>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index b8523ca82..c5166d8b7 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -24,7 +24,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.Boundedness;
@@ -58,7 +57,6 @@ import java.util.stream.Collectors;
@AutoService(SeaTunnelSource.class)
public class ClickhouseSource implements SeaTunnelSource<SeaTunnelRow, ClickhouseSourceSplit, ClickhouseSourceState> {
- private SeaTunnelContext seaTunnelContext;
private List<ClickHouseNode> servers;
private SeaTunnelRowType rowTypeInfo;
private String sql;
@@ -139,13 +137,4 @@ public class ClickhouseSource implements SeaTunnelSource<SeaTunnelRow, Clickhous
return new DefaultSerializer<>();
}
- @Override
- public SeaTunnelContext getSeaTunnelContext() {
- return seaTunnelContext;
- }
-
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/pom.xml
similarity index 93%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/pom.xml
index 447067c7d..26bfb0c24 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/pom.xml
@@ -27,7 +27,10 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-connector-seatunnel-socket</artifactId>
+ <artifactId>seatunnel-connector-seatunnel-common</artifactId>
+
+ <properties>
+ </properties>
<dependencies>
<dependency>
@@ -35,7 +38,5 @@
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>
-
</dependencies>
-
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSimpleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSimpleSink.java
new file mode 100644
index 000000000..703243b70
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSimpleSink.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.sink;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+public abstract class AbstractSimpleSink<T, StateT> implements SeaTunnelSink<T, StateT, Void, Void> {
+
+ @Override
+ public abstract AbstractSinkWriter<T, StateT> createWriter(SinkWriter.Context context) throws IOException;
+
+ @Override
+ public SinkWriter<T, Void, StateT> restoreWriter(SinkWriter.Context context, List<StateT> states) throws IOException {
+ return createWriter(context);
+ }
+
+ @Override
+ public final Optional<SinkCommitter<Void>> createCommitter() throws IOException {
+ return Optional.empty();
+ }
+
+ @Override
+ public final Optional<Serializer<Void>> getCommitInfoSerializer() {
+ return Optional.empty();
+ }
+
+ @Override
+ public final Optional<SinkAggregatedCommitter<Void, Void>> createAggregatedCommitter() throws IOException {
+ return Optional.empty();
+ }
+
+ @Override
+ public final Optional<Serializer<Void>> getAggregatedCommitInfoSerializer() {
+ return Optional.empty();
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplit.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java
similarity index 68%
rename from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplit.java
rename to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java
index cfdd2cdac..92c19dbf6 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplit.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java
@@ -15,19 +15,21 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.socket.source;
+package org.apache.seatunnel.connectors.seatunnel.common.sink;
-import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.sink.SinkWriter;
-public class SocketSourceSplit implements SourceSplit {
- private final String splitId;
+import java.util.Optional;
- public SocketSourceSplit(String splitId) {
- this.splitId = splitId;
+public abstract class AbstractSinkWriter<T, StateT> implements SinkWriter<T, Void, StateT> {
+
+ @Override
+ public final Optional<Void> prepareCommit() {
+ return Optional.empty();
}
@Override
- public String splitId() {
- return splitId;
+ public final void abortPrepare() {
+ // nothing
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitReader.java
new file mode 100644
index 000000000..14d98b65d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitReader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceReader;
+
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AbstractSingleSplitReader<T> implements SourceReader<T, SingleSplit> {
+
+ @Override
+ public final List<SingleSplit> snapshotState(long checkpointId) throws Exception {
+ return Collections.singletonList(new SingleSplit(snapshotStateToBytes(checkpointId)));
+ }
+
+ protected byte[] snapshotStateToBytes(long checkpointId) throws Exception {
+ // default nothing
+ return null;
+ }
+
+ @Override
+ public final void addSplits(List<SingleSplit> splits) {
+ if (splits.size() > 1) {
+ throw new UnsupportedOperationException("The single-split reader don't support reading multiple splits");
+ }
+ byte[] restoredState = splits.get(0).getState();
+ if (restoredState != null && restoredState.length > 0) {
+ restoreState(restoredState);
+ }
+ }
+
+ protected void restoreState(byte[] restoredState) {
+ // default nothing
+ }
+
+ @Override
+ public final void handleNoMoreSplits() {
+ // nothing
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // default nothing
+ }
+
+ @Override
+ public final void handleSourceEvent(SourceEvent sourceEvent) {
+ // nothing
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java
new file mode 100644
index 000000000..f2968c833
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+public abstract class AbstractSingleSplitSource<T> implements SeaTunnelSource<T, SingleSplit, SingleSplitEnumeratorState> {
+
+ @Override
+ public final AbstractSingleSplitReader<T> createReader(SourceReader.Context readerContext) throws Exception {
+ checkArgument(readerContext.getIndexOfSubtask() == 0, "Single split source allows only a single reader to be created.");
+ return createReader(new SingleSplitReaderContext(readerContext));
+ }
+
+ public abstract AbstractSingleSplitReader<T> createReader(SingleSplitReaderContext readerContext) throws Exception;
+
+ @Override
+ public final SourceSplitEnumerator<SingleSplit, SingleSplitEnumeratorState> createEnumerator(SourceSplitEnumerator.Context<SingleSplit> enumeratorContext) throws Exception {
+ return new SingleSplitEnumerator(enumeratorContext);
+ }
+
+ @Override
+ public final SourceSplitEnumerator<SingleSplit, SingleSplitEnumeratorState> restoreEnumerator(SourceSplitEnumerator.Context<SingleSplit> enumeratorContext, SingleSplitEnumeratorState checkpointState) throws Exception {
+ return createEnumerator(enumeratorContext);
+ }
+
+ @Override
+ public final Serializer<SingleSplitEnumeratorState> getEnumeratorStateSerializer() {
+ return new DefaultSerializer<>();
+ }
+
+ @Override
+ public final Serializer<SingleSplit> getSplitSerializer() {
+ return new DefaultSerializer<>();
+ }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplit.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplit.java
similarity index 75%
rename from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplit.java
rename to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplit.java
index 3496c546c..02951f467 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplit.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplit.java
@@ -15,20 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.http.source;
+package org.apache.seatunnel.connectors.seatunnel.common.source;
import org.apache.seatunnel.api.source.SourceSplit;
-public class HttpSourceSplit implements SourceSplit {
+public class SingleSplit implements SourceSplit {
+ private final byte[] state;
- private final String splitId;
+ public SingleSplit(byte[] state) {
+ this.state = state;
+ }
- public HttpSourceSplit(String splitId) {
- this.splitId = splitId;
+ public byte[] getState() {
+ return state;
}
@Override
public String splitId() {
- return this.splitId;
+ return "single";
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitEnumerator.java
similarity index 54%
rename from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplitEnumerator.java
rename to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitEnumerator.java
index f67d3335a..74049f1cc 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplitEnumerator.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitEnumerator.java
@@ -15,45 +15,56 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.http.source;
+package org.apache.seatunnel.connectors.seatunnel.common.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.connectors.seatunnel.http.state.HttpState;
import java.io.IOException;
import java.util.List;
-public class HttpSourceSplitEnumerator implements SourceSplitEnumerator<HttpSourceSplit, HttpState> {
- private SourceSplitEnumerator.Context<HttpSourceSplit> enumeratorContext;
- private HttpState httpState;
+public class SingleSplitEnumerator implements SourceSplitEnumerator<SingleSplit, SingleSplitEnumeratorState> {
+ protected final SourceSplitEnumerator.Context<SingleSplit> context;
+ protected SingleSplit pendingSplit;
+ protected volatile boolean assigned = false;
- public HttpSourceSplitEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit> enumeratorContext) {
- this.enumeratorContext = enumeratorContext;
- }
-
- public HttpSourceSplitEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit> enumeratorContext, HttpState httpState) {
- this.enumeratorContext = enumeratorContext;
- this.httpState = httpState;
+ public SingleSplitEnumerator(SourceSplitEnumerator.Context<SingleSplit> context) {
+ this.context = context;
}
@Override
public void open() {
-
+ // nothing
}
@Override
public void run() throws Exception {
+ if (assigned || pendingSplit != null) {
+ return;
+ }
+ pendingSplit = new SingleSplit(null);
+ assignSplit();
}
@Override
public void close() throws IOException {
-
+ // nothing
}
@Override
- public void addSplitsBack(List<HttpSourceSplit> splits, int subtaskId) {
+ public void addSplitsBack(List<SingleSplit> splits, int subtaskId) {
+ pendingSplit = splits.get(0);
+ assignSplit();
+ }
+ protected void assignSplit() {
+ if (assigned || pendingSplit == null) {
+ return;
+ }
+ if (context.registeredReaders().contains(0)) {
+ context.assignSplit(0, pendingSplit);
+ assigned = true;
+ }
}
@Override
@@ -63,21 +74,21 @@ public class HttpSourceSplitEnumerator implements SourceSplitEnumerator<HttpSour
@Override
public void handleSplitRequest(int subtaskId) {
-
+ // nothing
}
@Override
public void registerReader(int subtaskId) {
-
+ assignSplit();
}
@Override
- public HttpState snapshotState(long checkpointId) throws Exception {
- return null;
+ public SingleSplitEnumeratorState snapshotState(long checkpointId) throws Exception {
+ return new SingleSplitEnumeratorState();
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
-
+ // nothing
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitEnumeratorState.java
similarity index 86%
rename from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java
rename to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitEnumeratorState.java
index 3eda1f4c5..e00594e64 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitEnumeratorState.java
@@ -15,10 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.console.state;
+package org.apache.seatunnel.connectors.seatunnel.common.source;
import java.io.Serializable;
-public class ConsoleState implements Serializable {
-
+public class SingleSplitEnumeratorState implements Serializable {
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitReaderContext.java
similarity index 61%
rename from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
rename to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitReaderContext.java
index 7276516f0..5c56e5730 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitReaderContext.java
@@ -15,23 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
+package org.apache.seatunnel.connectors.seatunnel.common.source;
-import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SourceReader;
-public class FakeSourceSplit implements SourceSplit {
+public class SingleSplitReaderContext {
+ private final SourceReader.Context context;
- private static final long serialVersionUID = -1L;
-
- private final String splitId;
-
- public FakeSourceSplit(String splitId) {
- this.splitId = splitId;
+ public SingleSplitReaderContext(SourceReader.Context context) {
+ this.context = context;
}
- @Override
- public String splitId() {
- return splitId;
+ public Boundedness getBoundedness() {
+ return context.getBoundedness();
}
+ public void signalNoMoreElement() {
+ context.signalNoMoreElement();
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/pom.xml
index 2ad1dceb2..ebd392f85 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/pom.xml
@@ -30,7 +30,7 @@
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-api</artifactId>
+ <artifactId>seatunnel-connector-seatunnel-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java
deleted file mode 100644
index d866e0844..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.console.sink;
-
-public class ConsoleAggregatedCommitInfo {
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java
deleted file mode 100644
index 6c41be0ca..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.console.sink;
-
-public class ConsoleCommitInfo {
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 36310d161..fee2a1fb5 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -17,25 +17,22 @@
package org.apache.seatunnel.connectors.seatunnel.console.sink;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
-import java.util.List;
-
@AutoService(SeaTunnelSink.class)
-public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {
+public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
private Config pluginConfig;
- private SeaTunnelContext seaTunnelContext;
private SeaTunnelRowType seaTunnelRowType;
@Override
@@ -49,16 +46,10 @@ public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, Co
}
@Override
- public SinkWriter<SeaTunnelRow, ConsoleCommitInfo, ConsoleState> createWriter(SinkWriter.Context context) {
+ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
return new ConsoleSinkWriter(seaTunnelRowType);
}
- @Override
- public SinkWriter<SeaTunnelRow, ConsoleCommitInfo, ConsoleState> restoreWriter(
- SinkWriter.Context context, List<ConsoleState> states) {
- return restoreWriter(context, states);
- }
-
@Override
public String getPluginName() {
return "Console";
@@ -69,13 +60,4 @@ public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, Co
this.pluginConfig = pluginConfig;
}
- @Override
- public SeaTunnelContext getSeaTunnelContext() {
- return seaTunnelContext;
- }
-
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index 09e386d3e..e226f1623 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -17,18 +17,16 @@
package org.apache.seatunnel.connectors.seatunnel.console.sink;
-import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
-import java.util.Optional;
-public class ConsoleSinkWriter implements SinkWriter<SeaTunnelRow, ConsoleCommitInfo, ConsoleState> {
+public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsoleSinkWriter.class);
@@ -44,18 +42,8 @@ public class ConsoleSinkWriter implements SinkWriter<SeaTunnelRow, ConsoleCommit
System.out.println(Arrays.toString(element.getFields()));
}
- @Override
- public Optional<ConsoleCommitInfo> prepareCommit() {
- return Optional.empty();
- }
-
- @Override
- public void abortPrepare() {
-
- }
-
@Override
public void close() {
-
+ // nothing
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/pom.xml
index dfc8b76aa..727bd329d 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/pom.xml
@@ -32,10 +32,9 @@
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-api</artifactId>
+ <artifactId>seatunnel-connector-seatunnel-common</artifactId>
<version>${project.version}</version>
</dependency>
-
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index d641c30a9..65e6587f4 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -18,27 +18,32 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.common.SeaTunnelContext;
-import org.apache.seatunnel.api.serialization.DefaultSerializer;
-import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
@AutoService(SeaTunnelSource.class)
-public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeState> {
+public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private Config pluginConfig;
private SeaTunnelContext seaTunnelContext;
+ @Override
+ public Boundedness getBoundedness() {
+ return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+ }
+
@Override
public SeaTunnelRowType getProducedType() {
return new SeaTunnelRowType(
@@ -47,28 +52,10 @@ public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
}
@Override
- public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(SourceReader.Context readerContext) {
+ public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
return new FakeSourceReader(readerContext);
}
- @Override
- public SourceSplitEnumerator<FakeSourceSplit, FakeState> createEnumerator(
- SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
- return new FakeSourceSplitEnumerator(enumeratorContext);
- }
-
- @Override
- public SourceSplitEnumerator<FakeSourceSplit, FakeState> restoreEnumerator(
- SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, FakeState checkpointState) {
- // todo:
- return null;
- }
-
- @Override
- public Serializer<FakeState> getEnumeratorStateSerializer() {
- return new DefaultSerializer<>();
- }
-
@Override
public String getPluginName() {
return "FakeSource";
@@ -79,11 +66,6 @@ public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
this.pluginConfig = pluginConfig;
}
- @Override
- public SeaTunnelContext getSeaTunnelContext() {
- return seaTunnelContext;
- }
-
@Override
public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
this.seaTunnelContext = seaTunnelContext;
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java
deleted file mode 100644
index 76e0a3eb7..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
-
-import org.apache.seatunnel.api.source.SourceEvent;
-
-public class FakeSourceEvent implements SourceEvent {
-
- private final String name;
- private final int age;
- private final long timestamp;
-
- public FakeSourceEvent(String name, int age, long timestamp) {
- this.name = name;
- this.age = age;
- this.timestamp = timestamp;
- }
-
- public String getName() {
- return name;
- }
-
- public int getAge() {
- return age;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 5a387f871..7007eaf25 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -19,37 +19,37 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
-public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSplit> {
+public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class);
- private final SourceReader.Context context;
+ private final SingleSplitReaderContext context;
private final String[] names = {"Wenjun", "Fanjia", "Zongwen", "CalvinKirs"};
private final int[] ages = {11, 22, 33, 44};
- public FakeSourceReader(SourceReader.Context context) {
+ public FakeSourceReader(SingleSplitReaderContext context) {
this.context = context;
}
@Override
public void open() {
-
+ // nothing
}
@Override
public void close() {
-
+ // nothing
}
@Override
@@ -70,24 +70,4 @@ public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSp
}
Thread.sleep(1000L);
}
-
- @Override
- public List<FakeSourceSplit> snapshotState(long checkpointId) {
- return null;
- }
-
- @Override
- public void addSplits(List<FakeSourceSplit> splits) {
-
- }
-
- @Override
- public void handleNoMoreSplits() {
-
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
-
- }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
deleted file mode 100644
index 9b2900c1e..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
-
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
-
-import java.io.IOException;
-import java.util.List;
-
-public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSourceSplit, FakeState> {
-
- private final SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext;
-
- public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
- this.enumeratorContext = enumeratorContext;
- }
-
- @Override
- public void open() {
-
- }
-
- @Override
- public void run() {
-
- }
-
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
-
- }
-
- @Override
- public int currentUnassignedSplitSize() {
- return 0;
- }
-
- @Override
- public void handleSplitRequest(int subtaskId) {
-
- }
-
- @Override
- public void registerReader(int subtaskId) {
-
- }
-
- @Override
- public FakeState snapshotState(long checkpointId) throws Exception {
- return null;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
-
- }
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java
deleted file mode 100644
index f7a9b098a..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
-
-import org.apache.seatunnel.api.source.SupportCoordinate;
-
-public class FakeSupportCoordinate implements SupportCoordinate {
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java
deleted file mode 100644
index 0c0edecd0..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.state;
-
-import java.io.Serializable;
-
-public class FakeState implements Serializable {
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 6ce13bf2f..f1ba12edd 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.hive.sink;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -78,16 +77,6 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow, HiveSinkState, Hive
return new HiveSinkWriter(seaTunnelRowType, config, context, System.currentTimeMillis());
}
- @Override
- public SeaTunnelContext getSeaTunnelContext() {
- return null;
- }
-
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-
- }
-
@Override
public Optional<Serializer<HiveCommitInfo>> getCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml
index 1e78fb8a8..b8c29614a 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml
@@ -32,7 +32,7 @@
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-api</artifactId>
+ <artifactId>seatunnel-connector-seatunnel-common</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index 91f983256..0b906144b 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -26,19 +26,19 @@ import static org.apache.seatunnel.connectors.seatunnel.http.config.Config.URL;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelContext;
-import org.apache.seatunnel.api.serialization.DefaultSerializer;
-import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.http.state.HttpState;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -48,15 +48,21 @@ import java.util.Map;
import java.util.stream.Collectors;
@AutoService(SeaTunnelSource.class)
-public class HttpSource implements SeaTunnelSource<SeaTunnelRow, HttpSourceSplit, HttpState> {
+public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private final HttpSourceParameter parameter = new HttpSourceParameter();
private SeaTunnelRowType rowType;
private SeaTunnelContext seaTunnelContext;
+
@Override
public String getPluginName() {
return "Http";
}
+ @Override
+ public Boundedness getBoundedness() {
+ return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+ }
+
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL);
@@ -86,11 +92,6 @@ public class HttpSource implements SeaTunnelSource<SeaTunnelRow, HttpSourceSplit
this.rowType = new SeaTunnelRowType(new String[]{"content"}, new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
}
- @Override
- public SeaTunnelContext getSeaTunnelContext() {
- return this.seaTunnelContext;
- }
-
@Override
public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
this.seaTunnelContext = seaTunnelContext;
@@ -102,22 +103,7 @@ public class HttpSource implements SeaTunnelSource<SeaTunnelRow, HttpSourceSplit
}
@Override
- public SourceReader<SeaTunnelRow, HttpSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
+ public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
return new HttpSourceReader(this.parameter, readerContext);
}
-
- @Override
- public SourceSplitEnumerator<HttpSourceSplit, HttpState> createEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit> enumeratorContext) throws Exception {
- return new HttpSourceSplitEnumerator(enumeratorContext);
- }
-
- @Override
- public SourceSplitEnumerator<HttpSourceSplit, HttpState> restoreEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit> enumeratorContext, HttpState checkpointState) throws Exception {
- return new HttpSourceSplitEnumerator(enumeratorContext, checkpointState);
- }
-
- @Override
- public Serializer<HttpState> getEnumeratorStateSerializer() {
- return new DefaultSerializer<>();
- }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index cc31b2761..f11d6f581 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -21,8 +21,9 @@ import static org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
@@ -30,17 +31,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.List;
import java.util.Objects;
-public class HttpSourceReader implements SourceReader<SeaTunnelRow, HttpSourceSplit> {
+public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpSourceReader.class);
- private final SourceReader.Context context;
+ private final SingleSplitReaderContext context;
private final HttpSourceParameter parameter;
private HttpClientProvider httpClient;
- public HttpSourceReader(HttpSourceParameter parameter, SourceReader.Context context) {
+ public HttpSourceReader(HttpSourceParameter parameter, SingleSplitReaderContext context) {
this.context = context;
this.parameter = parameter;
}
@@ -76,24 +76,4 @@ public class HttpSourceReader implements SourceReader<SeaTunnelRow, HttpSourceSp
}
}
}
-
- @Override
- public List<HttpSourceSplit> snapshotState(long checkpointId) throws Exception {
- return null;
- }
-
- @Override
- public void addSplits(List<HttpSourceSplit> splits) {
-
- }
-
- @Override
- public void handleNoMoreSplits() {
-
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
-
- }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/state/HttpState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/state/HttpState.java
deleted file mode 100644
index 5332f67c1..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/state/HttpState.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.http.state;
-
-import java.io.Serializable;
-
-public class HttpState implements Serializable {
-
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 219b00c4d..672303f8c 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -126,11 +126,6 @@ public class JdbcSink
return this.seaTunnelRowType;
}
- @Override
- public SeaTunnelContext getSeaTunnelContext() {
- return seaTunnelContext;
- }
-
@Override
public Optional<Serializer<JdbcAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 611207a69..1c37dfd77 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -48,7 +47,6 @@ public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaSinkState, Ka
private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
- private SeaTunnelContext seaTunnelContext;
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
@@ -94,14 +92,4 @@ public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaSinkState, Ka
public String getPluginName() {
return "Kafka";
}
-
- @Override
- public SeaTunnelContext getSeaTunnelContext() {
- return seaTunnelContext;
- }
-
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index e5e29f14c..b5a3d42d6 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -37,6 +38,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
@@ -55,6 +57,11 @@ public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
private SeaTunnelRowType typeInfo;
private SeaTunnelContext seaTunnelContext;
+ @Override
+ public Boundedness getBoundedness() {
+ return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+ }
+
@Override
public String getPluginName() {
return "Kafka";
@@ -118,11 +125,6 @@ public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
return new DefaultSerializer<>();
}
- @Override
- public SeaTunnelContext getSeaTunnelContext() {
- return seaTunnelContext;
- }
-
@Override
public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
this.seaTunnelContext = seaTunnelContext;
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index 6e2b724de..20028efb0 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -40,7 +40,6 @@ import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProp
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StopMode.NEVER;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.serialization.Serializer;
@@ -80,7 +79,6 @@ import java.util.regex.Pattern;
public class PulsarSource<T> implements SeaTunnelSource<T, PulsarPartitionSplit, PulsarSplitEnumeratorState> {
public static final String IDENTIFIER = "pulsar";
private DeserializationSchema<T> deserialization;
- private SeaTunnelContext seaTunnelContext;
private PulsarAdminConfig adminConfig;
private PulsarClientConfig clientConfig;
@@ -234,16 +232,6 @@ public class PulsarSource<T> implements SeaTunnelSource<T, PulsarPartitionSplit,
return this.stopCursor instanceof NeverStopCursor ? Boundedness.UNBOUNDED : Boundedness.BOUNDED;
}
- @Override
- public SeaTunnelContext getSeaTunnelContext() {
- return this.seaTunnelContext;
- }
-
- @Override
- public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
- this.seaTunnelContext = seaTunnelContext;
- }
-
@Override
public SeaTunnelDataType<T> getProducedType() {
return deserialization.getProducedType();
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
index 447067c7d..857331558 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
@@ -32,10 +32,9 @@
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-api</artifactId>
+ <artifactId>seatunnel-connector-seatunnel-common</artifactId>
<version>${project.version}</version>
</dependency>
-
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
index 8aed6f46b..a237679d4 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
@@ -19,16 +19,16 @@ package org.apache.seatunnel.connectors.seatunnel.socket.source;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelContext;
-import org.apache.seatunnel.api.serialization.DefaultSerializer;
-import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.socket.state.SocketState;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
@@ -36,9 +36,15 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
import com.google.auto.service.AutoService;
@AutoService(SeaTunnelSource.class)
-public class SocketSource implements SeaTunnelSource<SeaTunnelRow, SocketSourceSplit, SocketState> {
+public class SocketSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private SocketSourceParameter parameter;
private SeaTunnelContext seaTunnelContext;
+
+ @Override
+ public Boundedness getBoundedness() {
+ return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+ }
+
@Override
public String getPluginName() {
return "Socket";
@@ -49,11 +55,6 @@ public class SocketSource implements SeaTunnelSource<SeaTunnelRow, SocketSourceS
this.parameter = ConfigBeanFactory.create(pluginConfig, SocketSourceParameter.class);
}
- @Override
- public SeaTunnelContext getSeaTunnelContext() {
- return this.seaTunnelContext;
- }
-
@Override
public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
this.seaTunnelContext = seaTunnelContext;
@@ -65,22 +66,7 @@ public class SocketSource implements SeaTunnelSource<SeaTunnelRow, SocketSourceS
}
@Override
- public SourceReader<SeaTunnelRow, SocketSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
+ public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
return new SocketSourceReader(this.parameter, readerContext);
}
-
- @Override
- public SourceSplitEnumerator<SocketSourceSplit, SocketState> createEnumerator(SourceSplitEnumerator.Context<SocketSourceSplit> enumeratorContext) throws Exception {
- return new SocketSourceSplitEnumerator(enumeratorContext);
- }
-
- @Override
- public SourceSplitEnumerator<SocketSourceSplit, SocketState> restoreEnumerator(SourceSplitEnumerator.Context<SocketSourceSplit> enumeratorContext, SocketState checkpointState) throws Exception {
- return null;
- }
-
- @Override
- public Serializer<SocketState> getEnumeratorStateSerializer() {
- return new DefaultSerializer<>();
- }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
index 5431b55aa..d37ce180d 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
@@ -19,8 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.socket.source;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,16 +31,16 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.util.List;
-public class SocketSourceReader implements SourceReader<SeaTunnelRow, SocketSourceSplit> {
+public class SocketSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
private static final Logger LOGGER = LoggerFactory.getLogger(SocketSourceReader.class);
private static final int CHAR_BUFFER_SIZE = 8192;
private final SocketSourceParameter parameter;
- private final SourceReader.Context context;
+ private final SingleSplitReaderContext context;
private Socket socket;
private String delimiter = "\n";
- SocketSourceReader(SocketSourceParameter parameter, SourceReader.Context context) {
+
+ SocketSourceReader(SocketSourceParameter parameter, SingleSplitReaderContext context) {
this.parameter = parameter;
this.context = context;
}
@@ -87,24 +88,4 @@ public class SocketSourceReader implements SourceReader<SeaTunnelRow, SocketSour
output.collect(new SeaTunnelRow(new Object[]{buffer.toString()}));
}
}
-
- @Override
- public List<SocketSourceSplit> snapshotState(long checkpointId) throws Exception {
- return null;
- }
-
- @Override
- public void addSplits(List<SocketSourceSplit> splits) {
-
- }
-
- @Override
- public void handleNoMoreSplits() {
-
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
-
- }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplitEnumerator.java
deleted file mode 100644
index 0d789e2b6..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplitEnumerator.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.socket.source;
-
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.connectors.seatunnel.socket.state.SocketState;
-
-import java.io.IOException;
-import java.util.List;
-
-public class SocketSourceSplitEnumerator implements SourceSplitEnumerator<SocketSourceSplit, SocketState> {
-
- private final SourceSplitEnumerator.Context<SocketSourceSplit> enumeratorContext;
-
- public SocketSourceSplitEnumerator(SourceSplitEnumerator.Context<SocketSourceSplit> enumeratorContext) {
- this.enumeratorContext = enumeratorContext;
- }
-
- @Override
- public void open() {
- }
-
- @Override
- public void run() throws Exception {
-
- }
-
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public void addSplitsBack(List<SocketSourceSplit> splits, int subtaskId) {
-
- }
-
- @Override
- public int currentUnassignedSplitSize() {
- return 0;
- }
-
- @Override
- public void handleSplitRequest(int subtaskId) {
-
- }
-
- @Override
- public void registerReader(int subtaskId) {
-
- }
-
- @Override
- public SocketState snapshotState(long checkpointId) throws Exception {
- return null;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
-
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
index 2a49f8224..27413cac2 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
@@ -89,7 +89,7 @@ public class CoordinatedBatchPartitionReader extends ParallelBatchPartitionReade
@Override
protected void handleNoMoreElement(int subtaskId) {
super.handleNoMoreElement(subtaskId);
- if (completedReader.incrementAndGet() == this.parallelism) {
+ if (!this.running) {
CoordinatedBatchPartitionReader.this.running = false;
}
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
index e61b2a1b2..0072a188a 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
@@ -130,7 +130,7 @@ public class CoordinatedMicroBatchPartitionReader extends ParallelMicroBatchPart
@Override
protected void handleNoMoreElement(int subtaskId) {
super.handleNoMoreElement(subtaskId);
- if (completedReader.incrementAndGet() == this.parallelism) {
+ if (!this.running) {
CoordinatedMicroBatchPartitionReader.this.running = false;
}
}