You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/27 05:58:09 UTC

[incubator-seatunnel] branch api-draft updated: [api-draft][connector] Add simplified connector api (#2041)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new a7357b6a9 [api-draft][connector] Add simplified connector api (#2041)
a7357b6a9 is described below

commit a7357b6a9bf6f9adc7986b21638ae1bb62eca308
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Mon Jun 27 13:58:05 2022 +0800

    [api-draft][connector] Add simplified connector api (#2041)
---
 .../api/source/SeaTunnelRuntimeEnvironment.java    | 11 +--
 .../seatunnel/api/source/SeaTunnelSource.java      |  6 +-
 .../seatunnel-connectors-seatunnel/pom.xml         |  1 +
 .../clickhouse/source/ClickhouseSource.java        | 11 ---
 .../pom.xml                                        |  7 +-
 .../seatunnel/common/sink/AbstractSimpleSink.java  | 59 ++++++++++++++++
 .../seatunnel/common/sink/AbstractSinkWriter.java} | 18 ++---
 .../common/source/AbstractSingleSplitReader.java   | 67 +++++++++++++++++++
 .../common/source/AbstractSingleSplitSource.java   | 58 ++++++++++++++++
 .../seatunnel/common/source/SingleSplit.java}      | 15 +++--
 .../common/source/SingleSplitEnumerator.java}      | 51 ++++++++------
 .../common/source/SingleSplitEnumeratorState.java} |  5 +-
 .../common/source/SingleSplitReaderContext.java}   | 24 +++----
 .../seatunnel-connector-seatunnel-console/pom.xml  |  2 +-
 .../console/sink/ConsoleAggregatedCommitInfo.java  | 21 ------
 .../seatunnel/console/sink/ConsoleCommitInfo.java  | 21 ------
 .../seatunnel/console/sink/ConsoleSink.java        | 26 ++------
 .../seatunnel/console/sink/ConsoleSinkWriter.java  | 18 +----
 .../seatunnel-connector-seatunnel-fake/pom.xml     |  3 +-
 .../seatunnel/fake/source/FakeSource.java          | 42 ++++--------
 .../seatunnel/fake/source/FakeSourceEvent.java     | 45 -------------
 .../seatunnel/fake/source/FakeSourceReader.java    | 34 ++--------
 .../fake/source/FakeSourceSplitEnumerator.java     | 78 ----------------------
 .../fake/source/FakeSupportCoordinate.java         | 23 -------
 .../connectors/seatunnel/fake/state/FakeState.java | 23 -------
 .../connectors/seatunnel/hive/sink/HiveSink.java   | 11 ---
 .../seatunnel-connector-seatunnel-http/pom.xml     |  2 +-
 .../seatunnel/http/source/HttpSource.java          | 40 ++++-------
 .../seatunnel/http/source/HttpSourceReader.java    | 30 ++-------
 .../connectors/seatunnel/http/state/HttpState.java | 24 -------
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   |  5 --
 .../connectors/seatunnel/kafka/sink/KafkaSink.java | 12 ----
 .../seatunnel/kafka/source/KafkaSource.java        | 12 ++--
 .../seatunnel/pulsar/source/PulsarSource.java      | 12 ----
 .../seatunnel-connector-seatunnel-socket/pom.xml   |  3 +-
 .../seatunnel/socket/source/SocketSource.java      | 40 ++++-------
 .../socket/source/SocketSourceReader.java          | 31 ++-------
 .../socket/source/SocketSourceSplitEnumerator.java | 77 ---------------------
 .../batch/CoordinatedBatchPartitionReader.java     |  2 +-
 .../CoordinatedMicroBatchPartitionReader.java      |  2 +-
 40 files changed, 333 insertions(+), 639 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
index 166306369..c0a51dd57 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
@@ -24,12 +24,7 @@ import org.apache.seatunnel.api.common.SeaTunnelContext;
  */
 public interface SeaTunnelRuntimeEnvironment {
 
-    /**
-     * Returns the SeaTunnel runtime context.
-     *
-     * @return seaTunnelContext
-     */
-    SeaTunnelContext getSeaTunnelContext();
-
-    void setSeaTunnelContext(SeaTunnelContext seaTunnelContext);
+    default void setSeaTunnelContext(SeaTunnelContext seaTunnelContext){
+        // nothing
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index b37a36002..3e1cb8702 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.common.constants.JobMode;
 
 import java.io.Serializable;
 
@@ -42,10 +41,7 @@ public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT>
      *
      * @return the boundedness of this source.
      */
-    default Boundedness getBoundedness() {
-        return JobMode.BATCH.equals(getSeaTunnelContext().getJobMode()) ?
-            Boundedness.BOUNDED : Boundedness.UNBOUNDED;
-    }
+    Boundedness getBoundedness();
 
     /**
      * Get the data type of the records produced by this source.
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index b4a9ac77f..6b94005e3 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -31,6 +31,7 @@
     <artifactId>seatunnel-connectors-seatunnel</artifactId>
 
     <modules>
+        <module>seatunnel-connector-seatunnel-common</module>
         <module>seatunnel-connector-seatunnel-console</module>
         <module>seatunnel-connector-seatunnel-fake</module>
         <module>seatunnel-connector-seatunnel-kafka</module>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index b8523ca82..c5166d8b7 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -24,7 +24,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config
 import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.Boundedness;
@@ -58,7 +57,6 @@ import java.util.stream.Collectors;
 @AutoService(SeaTunnelSource.class)
 public class ClickhouseSource implements SeaTunnelSource<SeaTunnelRow, ClickhouseSourceSplit, ClickhouseSourceState> {
 
-    private SeaTunnelContext seaTunnelContext;
     private List<ClickHouseNode> servers;
     private SeaTunnelRowType rowTypeInfo;
     private String sql;
@@ -139,13 +137,4 @@ public class ClickhouseSource implements SeaTunnelSource<SeaTunnelRow, Clickhous
         return new DefaultSerializer<>();
     }
 
-    @Override
-    public SeaTunnelContext getSeaTunnelContext() {
-        return seaTunnelContext;
-    }
-
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/pom.xml
similarity index 93%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/pom.xml
index 447067c7d..26bfb0c24 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/pom.xml
@@ -27,7 +27,10 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-connector-seatunnel-socket</artifactId>
+    <artifactId>seatunnel-connector-seatunnel-common</artifactId>
+
+    <properties>
+    </properties>
 
     <dependencies>
         <dependency>
@@ -35,7 +38,5 @@
             <artifactId>seatunnel-api</artifactId>
             <version>${project.version}</version>
         </dependency>
-
     </dependencies>
-
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSimpleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSimpleSink.java
new file mode 100644
index 000000000..703243b70
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSimpleSink.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.sink;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+public abstract class AbstractSimpleSink<T, StateT> implements SeaTunnelSink<T, StateT, Void, Void> {
+
+    @Override
+    public abstract AbstractSinkWriter<T, StateT> createWriter(SinkWriter.Context context) throws IOException;
+
+    @Override
+    public SinkWriter<T, Void, StateT> restoreWriter(SinkWriter.Context context, List<StateT> states) throws IOException {
+        return createWriter(context);
+    }
+
+    @Override
+    public final Optional<SinkCommitter<Void>> createCommitter() throws IOException {
+        return Optional.empty();
+    }
+
+    @Override
+    public final Optional<Serializer<Void>> getCommitInfoSerializer() {
+        return Optional.empty();
+    }
+
+    @Override
+    public final Optional<SinkAggregatedCommitter<Void, Void>> createAggregatedCommitter() throws IOException {
+        return Optional.empty();
+    }
+
+    @Override
+    public final Optional<Serializer<Void>> getAggregatedCommitInfoSerializer() {
+        return Optional.empty();
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplit.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java
similarity index 68%
rename from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplit.java
rename to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java
index cfdd2cdac..92c19dbf6 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplit.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java
@@ -15,19 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.socket.source;
+package org.apache.seatunnel.connectors.seatunnel.common.sink;
 
-import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.sink.SinkWriter;
 
-public class SocketSourceSplit implements SourceSplit {
-    private final String splitId;
+import java.util.Optional;
 
-    public SocketSourceSplit(String splitId) {
-        this.splitId = splitId;
+public abstract class AbstractSinkWriter<T, StateT> implements SinkWriter<T, Void, StateT> {
+
+    @Override
+    public final Optional<Void> prepareCommit() {
+        return Optional.empty();
     }
 
     @Override
-    public String splitId() {
-        return splitId;
+    public final void abortPrepare() {
+        // nothing
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitReader.java
new file mode 100644
index 000000000..14d98b65d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitReader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceReader;
+
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AbstractSingleSplitReader<T> implements SourceReader<T, SingleSplit> {
+
+    @Override
+    public final List<SingleSplit> snapshotState(long checkpointId) throws Exception {
+        return Collections.singletonList(new SingleSplit(snapshotStateToBytes(checkpointId)));
+    }
+
+    protected byte[] snapshotStateToBytes(long checkpointId) throws Exception {
+        // default nothing
+        return null;
+    }
+
+    @Override
+    public final void addSplits(List<SingleSplit> splits) {
+        if (splits.size() > 1) {
+            throw new UnsupportedOperationException("The single-split reader don't support reading multiple splits");
+        }
+        byte[] restoredState = splits.get(0).getState();
+        if (restoredState != null && restoredState.length > 0) {
+            restoreState(restoredState);
+        }
+    }
+
+    protected void restoreState(byte[] restoredState) {
+        // default nothing
+    }
+
+    @Override
+    public final void handleNoMoreSplits() {
+        // nothing
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // default nothing
+    }
+
+    @Override
+    public final void handleSourceEvent(SourceEvent sourceEvent) {
+        // nothing
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java
new file mode 100644
index 000000000..f2968c833
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitSource.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+public abstract class AbstractSingleSplitSource<T> implements SeaTunnelSource<T, SingleSplit, SingleSplitEnumeratorState> {
+
+    @Override
+    public final AbstractSingleSplitReader<T> createReader(SourceReader.Context readerContext) throws Exception {
+        checkArgument(readerContext.getIndexOfSubtask() == 0, "Single split source allows only a single reader to be created.");
+        return createReader(new SingleSplitReaderContext(readerContext));
+    }
+
+    public abstract AbstractSingleSplitReader<T> createReader(SingleSplitReaderContext readerContext) throws Exception;
+
+    @Override
+    public final SourceSplitEnumerator<SingleSplit, SingleSplitEnumeratorState> createEnumerator(SourceSplitEnumerator.Context<SingleSplit> enumeratorContext) throws Exception {
+        return new SingleSplitEnumerator(enumeratorContext);
+    }
+
+    @Override
+    public final SourceSplitEnumerator<SingleSplit, SingleSplitEnumeratorState> restoreEnumerator(SourceSplitEnumerator.Context<SingleSplit> enumeratorContext, SingleSplitEnumeratorState checkpointState) throws Exception {
+        return createEnumerator(enumeratorContext);
+    }
+
+    @Override
+    public final Serializer<SingleSplitEnumeratorState> getEnumeratorStateSerializer() {
+        return new DefaultSerializer<>();
+    }
+
+    @Override
+    public final Serializer<SingleSplit> getSplitSerializer() {
+        return new DefaultSerializer<>();
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplit.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplit.java
similarity index 75%
rename from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplit.java
rename to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplit.java
index 3496c546c..02951f467 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplit.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplit.java
@@ -15,20 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.http.source;
+package org.apache.seatunnel.connectors.seatunnel.common.source;
 
 import org.apache.seatunnel.api.source.SourceSplit;
 
-public class HttpSourceSplit implements SourceSplit {
+public class SingleSplit implements SourceSplit {
+    private final byte[] state;
 
-    private final String splitId;
+    public SingleSplit(byte[] state) {
+        this.state = state;
+    }
 
-    public HttpSourceSplit(String splitId) {
-        this.splitId = splitId;
+    public byte[] getState() {
+        return state;
     }
 
     @Override
     public String splitId() {
-        return this.splitId;
+        return "single";
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitEnumerator.java
similarity index 54%
rename from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplitEnumerator.java
rename to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitEnumerator.java
index f67d3335a..74049f1cc 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceSplitEnumerator.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitEnumerator.java
@@ -15,45 +15,56 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.http.source;
+package org.apache.seatunnel.connectors.seatunnel.common.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.connectors.seatunnel.http.state.HttpState;
 
 import java.io.IOException;
 import java.util.List;
 
-public class HttpSourceSplitEnumerator implements SourceSplitEnumerator<HttpSourceSplit, HttpState> {
-    private SourceSplitEnumerator.Context<HttpSourceSplit> enumeratorContext;
-    private HttpState httpState;
+public class SingleSplitEnumerator implements SourceSplitEnumerator<SingleSplit, SingleSplitEnumeratorState> {
+    protected final SourceSplitEnumerator.Context<SingleSplit> context;
+    protected SingleSplit pendingSplit;
+    protected volatile boolean assigned = false;
 
-    public HttpSourceSplitEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit> enumeratorContext) {
-        this.enumeratorContext = enumeratorContext;
-    }
-
-    public HttpSourceSplitEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit> enumeratorContext, HttpState httpState) {
-        this.enumeratorContext = enumeratorContext;
-        this.httpState = httpState;
+    public SingleSplitEnumerator(SourceSplitEnumerator.Context<SingleSplit> context) {
+        this.context = context;
     }
 
     @Override
     public void open() {
-
+        // nothing
     }
 
     @Override
     public void run() throws Exception {
+        if (assigned || pendingSplit != null) {
+            return;
+        }
 
+        pendingSplit = new SingleSplit(null);
+        assignSplit();
     }
 
     @Override
     public void close() throws IOException {
-
+        // nothing
     }
 
     @Override
-    public void addSplitsBack(List<HttpSourceSplit> splits, int subtaskId) {
+    public void addSplitsBack(List<SingleSplit> splits, int subtaskId) {
+        pendingSplit = splits.get(0);
+        assignSplit();
+    }
 
+    protected void assignSplit() {
+        if (assigned || pendingSplit == null) {
+            return;
+        }
+        if (context.registeredReaders().contains(0)) {
+            context.assignSplit(0, pendingSplit);
+            assigned = true;
+        }
     }
 
     @Override
@@ -63,21 +74,21 @@ public class HttpSourceSplitEnumerator implements SourceSplitEnumerator<HttpSour
 
     @Override
     public void handleSplitRequest(int subtaskId) {
-
+        // nothing
     }
 
     @Override
     public void registerReader(int subtaskId) {
-
+        assignSplit();
     }
 
     @Override
-    public HttpState snapshotState(long checkpointId) throws Exception {
-        return null;
+    public SingleSplitEnumeratorState snapshotState(long checkpointId) throws Exception {
+        return new SingleSplitEnumeratorState();
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
-
+        // nothing
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitEnumeratorState.java
similarity index 86%
rename from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java
rename to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitEnumeratorState.java
index 3eda1f4c5..e00594e64 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/state/ConsoleState.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitEnumeratorState.java
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.console.state;
+package org.apache.seatunnel.connectors.seatunnel.common.source;
 
 import java.io.Serializable;
 
-public class ConsoleState implements Serializable {
-
+public class SingleSplitEnumeratorState implements Serializable {
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitReaderContext.java
similarity index 61%
rename from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
rename to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitReaderContext.java
index 7276516f0..5c56e5730 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/SingleSplitReaderContext.java
@@ -15,23 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
+package org.apache.seatunnel.connectors.seatunnel.common.source;
 
-import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SourceReader;
 
-public class FakeSourceSplit implements SourceSplit {
+public class SingleSplitReaderContext {
+    private final SourceReader.Context context;
 
-    private static final long serialVersionUID = -1L;
-
-    private final String splitId;
-
-    public FakeSourceSplit(String splitId) {
-        this.splitId = splitId;
+    public SingleSplitReaderContext(SourceReader.Context context) {
+        this.context = context;
     }
 
-    @Override
-    public String splitId() {
-        return splitId;
+    public Boundedness getBoundedness() {
+        return context.getBoundedness();
     }
 
+    public void signalNoMoreElement() {
+        context.signalNoMoreElement();
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/pom.xml
index 2ad1dceb2..ebd392f85 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/pom.xml
@@ -30,7 +30,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-api</artifactId>
+            <artifactId>seatunnel-connector-seatunnel-common</artifactId>
             <version>${project.version}</version>
         </dependency>
     </dependencies>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java
deleted file mode 100644
index d866e0844..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleAggregatedCommitInfo.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.console.sink;
-
-public class ConsoleAggregatedCommitInfo {
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java
deleted file mode 100644
index 6c41be0ca..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleCommitInfo.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.console.sink;
-
-public class ConsoleCommitInfo {
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 36310d161..fee2a1fb5 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -17,25 +17,22 @@
 
 package org.apache.seatunnel.connectors.seatunnel.console.sink;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
 
-import java.util.List;
-
 @AutoService(SeaTunnelSink.class)
-public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {
+public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
 
     private Config pluginConfig;
-    private SeaTunnelContext seaTunnelContext;
     private SeaTunnelRowType seaTunnelRowType;
 
     @Override
@@ -49,16 +46,10 @@ public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, Co
     }
 
     @Override
-    public SinkWriter<SeaTunnelRow, ConsoleCommitInfo, ConsoleState> createWriter(SinkWriter.Context context) {
+    public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
         return new ConsoleSinkWriter(seaTunnelRowType);
     }
 
-    @Override
-    public SinkWriter<SeaTunnelRow, ConsoleCommitInfo, ConsoleState> restoreWriter(
-        SinkWriter.Context context, List<ConsoleState> states) {
-        return restoreWriter(context, states);
-    }
-
     @Override
     public String getPluginName() {
         return "Console";
@@ -69,13 +60,4 @@ public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, Co
         this.pluginConfig = pluginConfig;
     }
 
-    @Override
-    public SeaTunnelContext getSeaTunnelContext() {
-        return seaTunnelContext;
-    }
-
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index 09e386d3e..e226f1623 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -17,18 +17,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.console.sink;
 
-import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
-import java.util.Optional;
 
-public class ConsoleSinkWriter implements SinkWriter<SeaTunnelRow, ConsoleCommitInfo, ConsoleState> {
+public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsoleSinkWriter.class);
 
@@ -44,18 +42,8 @@ public class ConsoleSinkWriter implements SinkWriter<SeaTunnelRow, ConsoleCommit
         System.out.println(Arrays.toString(element.getFields()));
     }
 
-    @Override
-    public Optional<ConsoleCommitInfo> prepareCommit() {
-        return Optional.empty();
-    }
-
-    @Override
-    public void abortPrepare() {
-
-    }
-
     @Override
     public void close() {
-
+        // nothing
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/pom.xml
index dfc8b76aa..727bd329d 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/pom.xml
@@ -32,10 +32,9 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-api</artifactId>
+            <artifactId>seatunnel-connector-seatunnel-common</artifactId>
             <version>${project.version}</version>
         </dependency>
-
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index d641c30a9..65e6587f4 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -18,27 +18,32 @@
 package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
 import org.apache.seatunnel.api.common.SeaTunnelContext;
-import org.apache.seatunnel.api.serialization.DefaultSerializer;
-import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
 
 @AutoService(SeaTunnelSource.class)
-public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeState> {
+public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
 
     private Config pluginConfig;
     private SeaTunnelContext seaTunnelContext;
 
+    @Override
+    public Boundedness getBoundedness() {
+        return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+    }
+
     @Override
     public SeaTunnelRowType getProducedType() {
         return new SeaTunnelRowType(
@@ -47,28 +52,10 @@ public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
     }
 
     @Override
-    public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(SourceReader.Context readerContext) {
+    public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
         return new FakeSourceReader(readerContext);
     }
 
-    @Override
-    public SourceSplitEnumerator<FakeSourceSplit, FakeState> createEnumerator(
-        SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
-        return new FakeSourceSplitEnumerator(enumeratorContext);
-    }
-
-    @Override
-    public SourceSplitEnumerator<FakeSourceSplit, FakeState> restoreEnumerator(
-        SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, FakeState checkpointState) {
-        // todo:
-        return null;
-    }
-
-    @Override
-    public Serializer<FakeState> getEnumeratorStateSerializer() {
-        return new DefaultSerializer<>();
-    }
-
     @Override
     public String getPluginName() {
         return "FakeSource";
@@ -79,11 +66,6 @@ public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
         this.pluginConfig = pluginConfig;
     }
 
-    @Override
-    public SeaTunnelContext getSeaTunnelContext() {
-        return seaTunnelContext;
-    }
-
     @Override
     public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
         this.seaTunnelContext = seaTunnelContext;
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java
deleted file mode 100644
index 76e0a3eb7..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceEvent.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
-
-import org.apache.seatunnel.api.source.SourceEvent;
-
-public class FakeSourceEvent implements SourceEvent {
-
-    private final String name;
-    private final int age;
-    private final long timestamp;
-
-    public FakeSourceEvent(String name, int age, long timestamp) {
-        this.name = name;
-        this.age = age;
-        this.timestamp = timestamp;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public int getAge() {
-        return age;
-    }
-
-    public long getTimestamp() {
-        return timestamp;
-    }
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 5a387f871..7007eaf25 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -19,37 +19,37 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 
-public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSplit> {
+public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class);
 
-    private final SourceReader.Context context;
+    private final SingleSplitReaderContext context;
 
     private final String[] names = {"Wenjun", "Fanjia", "Zongwen", "CalvinKirs"};
     private final int[] ages = {11, 22, 33, 44};
 
-    public FakeSourceReader(SourceReader.Context context) {
+    public FakeSourceReader(SingleSplitReaderContext context) {
         this.context = context;
     }
 
     @Override
     public void open() {
-
+        // nothing
     }
 
     @Override
     public void close() {
-
+        // nothing
     }
 
     @Override
@@ -70,24 +70,4 @@ public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSp
         }
         Thread.sleep(1000L);
     }
-
-    @Override
-    public List<FakeSourceSplit> snapshotState(long checkpointId) {
-        return null;
-    }
-
-    @Override
-    public void addSplits(List<FakeSourceSplit> splits) {
-
-    }
-
-    @Override
-    public void handleNoMoreSplits() {
-
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) {
-
-    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
deleted file mode 100644
index 9b2900c1e..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
-
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
-
-import java.io.IOException;
-import java.util.List;
-
-public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSourceSplit, FakeState> {
-
-    private final SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext;
-
-    public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
-        this.enumeratorContext = enumeratorContext;
-    }
-
-    @Override
-    public void open() {
-
-    }
-
-    @Override
-    public void run() {
-
-    }
-
-    @Override
-    public void close() throws IOException {
-
-    }
-
-    @Override
-    public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
-
-    }
-
-    @Override
-    public int currentUnassignedSplitSize() {
-        return 0;
-    }
-
-    @Override
-    public void handleSplitRequest(int subtaskId) {
-
-    }
-
-    @Override
-    public void registerReader(int subtaskId) {
-
-    }
-
-    @Override
-    public FakeState snapshotState(long checkpointId) throws Exception {
-        return null;
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-
-    }
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java
deleted file mode 100644
index f7a9b098a..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSupportCoordinate.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
-
-import org.apache.seatunnel.api.source.SupportCoordinate;
-
-public class FakeSupportCoordinate implements SupportCoordinate {
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java
deleted file mode 100644
index 0c0edecd0..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeState.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.state;
-
-import java.io.Serializable;
-
-public class FakeState implements Serializable {
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 6ce13bf2f..f1ba12edd 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -78,16 +77,6 @@ public class HiveSink implements SeaTunnelSink<SeaTunnelRow, HiveSinkState, Hive
         return new HiveSinkWriter(seaTunnelRowType, config, context, System.currentTimeMillis());
     }
 
-    @Override
-    public SeaTunnelContext getSeaTunnelContext() {
-        return null;
-    }
-
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-
-    }
-
     @Override
     public Optional<Serializer<HiveCommitInfo>> getCommitInfoSerializer() {
         return Optional.of(new DefaultSerializer<>());
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml
index 1e78fb8a8..b8c29614a 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/pom.xml
@@ -32,7 +32,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-api</artifactId>
+            <artifactId>seatunnel-connector-seatunnel-common</artifactId>
             <version>${project.version}</version>
         </dependency>
 
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index 91f983256..0b906144b 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -26,19 +26,19 @@ import static org.apache.seatunnel.connectors.seatunnel.http.config.Config.URL;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelContext;
-import org.apache.seatunnel.api.serialization.DefaultSerializer;
-import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.http.state.HttpState;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -48,15 +48,21 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 @AutoService(SeaTunnelSource.class)
-public class HttpSource implements SeaTunnelSource<SeaTunnelRow, HttpSourceSplit, HttpState> {
+public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     private final HttpSourceParameter parameter = new HttpSourceParameter();
     private SeaTunnelRowType rowType;
     private SeaTunnelContext seaTunnelContext;
+
     @Override
     public String getPluginName() {
         return "Http";
     }
 
+    @Override
+    public Boundedness getBoundedness() {
+        return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+    }
+
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL);
@@ -86,11 +92,6 @@ public class HttpSource implements SeaTunnelSource<SeaTunnelRow, HttpSourceSplit
         this.rowType = new SeaTunnelRowType(new String[]{"content"}, new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
     }
 
-    @Override
-    public SeaTunnelContext getSeaTunnelContext() {
-        return this.seaTunnelContext;
-    }
-
     @Override
     public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
         this.seaTunnelContext = seaTunnelContext;
@@ -102,22 +103,7 @@ public class HttpSource implements SeaTunnelSource<SeaTunnelRow, HttpSourceSplit
     }
 
     @Override
-    public SourceReader<SeaTunnelRow, HttpSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
+    public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
         return new HttpSourceReader(this.parameter, readerContext);
     }
-
-    @Override
-    public SourceSplitEnumerator<HttpSourceSplit, HttpState> createEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit> enumeratorContext) throws Exception {
-        return new HttpSourceSplitEnumerator(enumeratorContext);
-    }
-
-    @Override
-    public SourceSplitEnumerator<HttpSourceSplit, HttpState> restoreEnumerator(SourceSplitEnumerator.Context<HttpSourceSplit> enumeratorContext, HttpState checkpointState) throws Exception {
-        return new HttpSourceSplitEnumerator(enumeratorContext, checkpointState);
-    }
-
-    @Override
-    public Serializer<HttpState> getEnumeratorStateSerializer() {
-        return new DefaultSerializer<>();
-    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index cc31b2761..f11d6f581 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -21,8 +21,9 @@ import static org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse
 
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 import org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
 import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
 
@@ -30,17 +31,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Objects;
 
-public class HttpSourceReader implements SourceReader<SeaTunnelRow, HttpSourceSplit> {
+public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(HttpSourceReader.class);
-    private final SourceReader.Context context;
+    private final SingleSplitReaderContext context;
     private final HttpSourceParameter parameter;
     private HttpClientProvider httpClient;
 
-    public HttpSourceReader(HttpSourceParameter parameter, SourceReader.Context context) {
+    public HttpSourceReader(HttpSourceParameter parameter, SingleSplitReaderContext context) {
         this.context = context;
         this.parameter = parameter;
     }
@@ -76,24 +76,4 @@ public class HttpSourceReader implements SourceReader<SeaTunnelRow, HttpSourceSp
             }
         }
     }
-
-    @Override
-    public List<HttpSourceSplit> snapshotState(long checkpointId) throws Exception {
-        return null;
-    }
-
-    @Override
-    public void addSplits(List<HttpSourceSplit> splits) {
-
-    }
-
-    @Override
-    public void handleNoMoreSplits() {
-
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-
-    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/state/HttpState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/state/HttpState.java
deleted file mode 100644
index 5332f67c1..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-http/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/state/HttpState.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.http.state;
-
-import java.io.Serializable;
-
-public class HttpState implements Serializable {
-
-}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 219b00c4d..672303f8c 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -126,11 +126,6 @@ public class JdbcSink
         return this.seaTunnelRowType;
     }
 
-    @Override
-    public SeaTunnelContext getSeaTunnelContext() {
-        return seaTunnelContext;
-    }
-
     @Override
     public Optional<Serializer<JdbcAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
         return Optional.of(new DefaultSerializer<>());
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 611207a69..1c37dfd77 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -48,7 +47,6 @@ public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaSinkState, Ka
 
     private Config pluginConfig;
     private SeaTunnelRowType seaTunnelRowType;
-    private SeaTunnelContext seaTunnelContext;
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
@@ -94,14 +92,4 @@ public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaSinkState, Ka
     public String getPluginName() {
         return "Kafka";
     }
-
-    @Override
-    public SeaTunnelContext getSeaTunnelContext() {
-        return seaTunnelContext;
-    }
-
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index e5e29f14c..b5a3d42d6 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
@@ -37,6 +38,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 
@@ -55,6 +57,11 @@ public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
     private SeaTunnelRowType typeInfo;
     private SeaTunnelContext seaTunnelContext;
 
+    @Override
+    public Boundedness getBoundedness() {
+        return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+    }
+
     @Override
     public String getPluginName() {
         return "Kafka";
@@ -118,11 +125,6 @@ public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
         return new DefaultSerializer<>();
     }
 
-    @Override
-    public SeaTunnelContext getSeaTunnelContext() {
-        return seaTunnelContext;
-    }
-
     @Override
     public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
         this.seaTunnelContext = seaTunnelContext;
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index 6e2b724de..20028efb0 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -40,7 +40,6 @@ import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProp
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StopMode.NEVER;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.serialization.Serializer;
@@ -80,7 +79,6 @@ import java.util.regex.Pattern;
 public class PulsarSource<T> implements SeaTunnelSource<T, PulsarPartitionSplit, PulsarSplitEnumeratorState> {
     public static final String IDENTIFIER = "pulsar";
     private DeserializationSchema<T> deserialization;
-    private SeaTunnelContext seaTunnelContext;
 
     private PulsarAdminConfig adminConfig;
     private PulsarClientConfig clientConfig;
@@ -234,16 +232,6 @@ public class PulsarSource<T> implements SeaTunnelSource<T, PulsarPartitionSplit,
         return this.stopCursor instanceof NeverStopCursor ? Boundedness.UNBOUNDED : Boundedness.BOUNDED;
     }
 
-    @Override
-    public SeaTunnelContext getSeaTunnelContext() {
-        return this.seaTunnelContext;
-    }
-
-    @Override
-    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
-        this.seaTunnelContext = seaTunnelContext;
-    }
-
     @Override
     public SeaTunnelDataType<T> getProducedType() {
         return deserialization.getProducedType();
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
index 447067c7d..857331558 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/pom.xml
@@ -32,10 +32,9 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-api</artifactId>
+            <artifactId>seatunnel-connector-seatunnel-common</artifactId>
             <version>${project.version}</version>
         </dependency>
-
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
index 8aed6f46b..a237679d4 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSource.java
@@ -19,16 +19,16 @@ package org.apache.seatunnel.connectors.seatunnel.socket.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelContext;
-import org.apache.seatunnel.api.serialization.DefaultSerializer;
-import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.socket.state.SocketState;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
@@ -36,9 +36,15 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
 import com.google.auto.service.AutoService;
 
 @AutoService(SeaTunnelSource.class)
-public class SocketSource implements SeaTunnelSource<SeaTunnelRow, SocketSourceSplit, SocketState> {
+public class SocketSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     private SocketSourceParameter parameter;
     private SeaTunnelContext seaTunnelContext;
+
+    @Override
+    public Boundedness getBoundedness() {
+        return JobMode.BATCH.equals(seaTunnelContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
+    }
+
     @Override
     public String getPluginName() {
         return "Socket";
@@ -49,11 +55,6 @@ public class SocketSource implements SeaTunnelSource<SeaTunnelRow, SocketSourceS
         this.parameter = ConfigBeanFactory.create(pluginConfig, SocketSourceParameter.class);
     }
 
-    @Override
-    public SeaTunnelContext getSeaTunnelContext() {
-        return this.seaTunnelContext;
-    }
-
     @Override
     public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
         this.seaTunnelContext = seaTunnelContext;
@@ -65,22 +66,7 @@ public class SocketSource implements SeaTunnelSource<SeaTunnelRow, SocketSourceS
     }
 
     @Override
-    public SourceReader<SeaTunnelRow, SocketSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
+    public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
         return new SocketSourceReader(this.parameter, readerContext);
     }
-
-    @Override
-    public SourceSplitEnumerator<SocketSourceSplit, SocketState> createEnumerator(SourceSplitEnumerator.Context<SocketSourceSplit> enumeratorContext) throws Exception {
-        return new SocketSourceSplitEnumerator(enumeratorContext);
-    }
-
-    @Override
-    public SourceSplitEnumerator<SocketSourceSplit, SocketState> restoreEnumerator(SourceSplitEnumerator.Context<SocketSourceSplit> enumeratorContext, SocketState checkpointState) throws Exception {
-        return null;
-    }
-
-    @Override
-    public Serializer<SocketState> getEnumeratorStateSerializer() {
-        return new DefaultSerializer<>();
-    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
index 5431b55aa..d37ce180d 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
@@ -19,8 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.socket.source;
 
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,16 +31,16 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.List;
 
-public class SocketSourceReader implements SourceReader<SeaTunnelRow, SocketSourceSplit> {
+public class SocketSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
     private static final Logger LOGGER = LoggerFactory.getLogger(SocketSourceReader.class);
     private static final int CHAR_BUFFER_SIZE = 8192;
     private final SocketSourceParameter parameter;
-    private final SourceReader.Context context;
+    private final SingleSplitReaderContext context;
     private Socket socket;
     private String delimiter = "\n";
-    SocketSourceReader(SocketSourceParameter parameter, SourceReader.Context context) {
+
+    SocketSourceReader(SocketSourceParameter parameter, SingleSplitReaderContext context) {
         this.parameter = parameter;
         this.context = context;
     }
@@ -87,24 +88,4 @@ public class SocketSourceReader implements SourceReader<SeaTunnelRow, SocketSour
             output.collect(new SeaTunnelRow(new Object[]{buffer.toString()}));
         }
     }
-
-    @Override
-    public List<SocketSourceSplit> snapshotState(long checkpointId) throws Exception {
-        return null;
-    }
-
-    @Override
-    public void addSplits(List<SocketSourceSplit> splits) {
-
-    }
-
-    @Override
-    public void handleNoMoreSplits() {
-
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-
-    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplitEnumerator.java
deleted file mode 100644
index 0d789e2b6..000000000
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceSplitEnumerator.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.socket.source;
-
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.connectors.seatunnel.socket.state.SocketState;
-
-import java.io.IOException;
-import java.util.List;
-
-public class SocketSourceSplitEnumerator implements SourceSplitEnumerator<SocketSourceSplit, SocketState> {
-
-    private final SourceSplitEnumerator.Context<SocketSourceSplit> enumeratorContext;
-
-    public SocketSourceSplitEnumerator(SourceSplitEnumerator.Context<SocketSourceSplit> enumeratorContext) {
-        this.enumeratorContext = enumeratorContext;
-    }
-
-    @Override
-    public void open() {
-    }
-
-    @Override
-    public void run() throws Exception {
-
-    }
-
-    @Override
-    public void close() throws IOException {
-
-    }
-
-    @Override
-    public void addSplitsBack(List<SocketSourceSplit> splits, int subtaskId) {
-
-    }
-
-    @Override
-    public int currentUnassignedSplitSize() {
-        return 0;
-    }
-
-    @Override
-    public void handleSplitRequest(int subtaskId) {
-
-    }
-
-    @Override
-    public void registerReader(int subtaskId) {
-
-    }
-
-    @Override
-    public SocketState snapshotState(long checkpointId) throws Exception {
-        return null;
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
index 2a49f8224..27413cac2 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/batch/CoordinatedBatchPartitionReader.java
@@ -89,7 +89,7 @@ public class CoordinatedBatchPartitionReader extends ParallelBatchPartitionReade
         @Override
         protected void handleNoMoreElement(int subtaskId) {
             super.handleNoMoreElement(subtaskId);
-            if (completedReader.incrementAndGet() == this.parallelism) {
+            if (!this.running) {
                 CoordinatedBatchPartitionReader.this.running = false;
             }
         }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
index e61b2a1b2..0072a188a 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/CoordinatedMicroBatchPartitionReader.java
@@ -130,7 +130,7 @@ public class CoordinatedMicroBatchPartitionReader extends ParallelMicroBatchPart
         @Override
         protected void handleNoMoreElement(int subtaskId) {
             super.handleNoMoreElement(subtaskId);
-            if (completedReader.incrementAndGet() == this.parallelism) {
+            if (!this.running) {
                 CoordinatedMicroBatchPartitionReader.this.running = false;
             }
         }