You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/10 17:16:27 UTC

[incubator-seatunnel] branch api-draft updated: [Api-Draft] SeaTunnel Source support Flink engine. (#1842)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 6b5136b7 [Api-Draft] SeaTunnel Source support Flink engine. (#1842)
6b5136b7 is described below

commit 6b5136b7b015754cd2e3aea2d8735a9f82f10df2
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Wed May 11 01:16:20 2022 +0800

    [Api-Draft] SeaTunnel Source support Flink engine. (#1842)
    
    Coordinated sources are not yet supported
---
 .../org/apache/seatunnel/api/source/Source.java    |   3 +
 .../apache/seatunnel/api/source/SourceReader.java  |  36 +++--
 .../api/source/SourceSplitEnumerator.java          |  44 ++++---
 .../seatunnel/api/table/factory/FactoryUtil.java   |   1 +
 .../source/CoordinatedEnumeratorContext.java       |  60 +++++++++
 .../translation/source/CoordinatedSource.java      |   9 +-
 .../source/ParallelEnumeratorContext.java          |  77 +++++++++++
 .../translation/source/ParallelReaderContext.java  |  54 ++++++++
 .../translation/source/ParallelSource.java         | 145 ++++++++++++++++++++-
 .../ThreadPoolExecutorFactory.java}                |  17 ++-
 .../flink/serialization/KryoTypeInfo.java          |  41 ++++++
 .../flink/serialization/WrappedRow.java}           |  37 +++++-
 .../flink/source/SeaTunnelParallelSource.java      | 130 ++++++++++++++++++
 .../flink/source/WrappedRowCollector.java          |  46 +++++++
 .../seatunnel-translation-spark/pom.xml            |  24 +++-
 .../spark/source/ParallelSourceSupport.java        |  61 +++++++++
 .../translation/spark/source/SparkState.java       |  30 +++++
 17 files changed, 782 insertions(+), 33 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
index 80cf9aaf..7d77f0d7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
@@ -39,9 +39,12 @@ public interface Source<T, SplitT extends SourceSplit, StateT> extends Serializa
 
     SourceReader<T, SplitT> createReader(SourceReader.Context readerContext) throws Exception;
 
+    Serializer<SplitT> getSplitSerializer();
+
     SourceSplitEnumerator<SplitT, StateT> createEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext) throws Exception;
 
     SourceSplitEnumerator<SplitT, StateT> restoreEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext, StateT checkpointState) throws Exception;
 
     Serializer<StateT> getEnumeratorStateSerializer();
+
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
index dff0527d..8131e835 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
@@ -19,35 +19,53 @@ package org.apache.seatunnel.api.source;
 
 import org.apache.seatunnel.api.state.CheckpointListener;
 
+import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 
-public interface SourceReader<T, SplitT extends SourceSplit> extends CheckpointListener {
+public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable, CheckpointListener {
 
-    void start(Collector<T> output) throws Exception;
+    void open();
 
-    List<SplitT> snapshotState(long checkpointId);
+    /**
+     * Called to close the reader, in case it holds on to any resources, like threads or network
+     * connections.
+     */
+    @Override
+    void close() throws IOException;
+
+    void pollNext(Collector<T> output) throws Exception;
+
+    List<SplitT> snapshotState(long checkpointId) throws Exception;
 
     void addSplits(List<SplitT> splits);
 
+    /**
+     * This method is called when the reader is notified that it will not receive any further
+     * splits.
+     *
+     * <p>It is triggered when the enumerator calls {@link
+     * SourceSplitEnumerator.Context#signalNoMoreSplits(int)} with the reader's parallel subtask.
+     */
+    void handleNoMoreSplits();
+
     default void handleSourceEvent(SourceEvent sourceEvent) {
     }
 
     interface Context {
 
         /**
-         * Gets the configuration with which Flink was started.
+         * @return The index of this subtask.
          */
-        Map<String, String> getConfiguration();
+        int getIndexOfSubtask();
 
         /**
-         * @return The index of this subtask.
+         * Indicator that the input has reached the end of data.
          */
-        int getIndexOfSubtask();
+        void signalNoMoreElement();
 
         /**
          * Sends a split request to the source's {@link SourceSplitEnumerator}. This will result in a call to
-         * the {@link SourceSplitEnumerator#handleSplitRequest(int, String)} method, with this reader's
+         * the {@link SourceSplitEnumerator#handleSplitRequest(int)} method, with this reader's
          * parallel subtask id and the hostname where this reader runs.
          */
         void sendSplitRequest();
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
index 68c2b694..77e75c8f 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -18,17 +18,35 @@
 package org.apache.seatunnel.api.source;
 
 import org.apache.seatunnel.api.state.CheckpointListener;
-import org.apache.seatunnel.common.constants.CollectionConstants;
 
+import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
-public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> extends CheckpointListener {
+public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> extends AutoCloseable, CheckpointListener {
 
-    void handleSplitRequest(int subtaskId, String requesterHostname);
+    void open();
+
+    void run();
+
+    /**
+     * Called to close the enumerator, in case it holds on to any resources, like threads or network
+     * connections.
+     */
+    @Override
+    void close() throws IOException;
+
+    /**
+     * Add a split back to the split enumerator. It will only happen when a {@link SourceReader}
+     * fails and there are splits assigned to it after the last successful checkpoint.
+     *
+     * @param splits    The split to add back to the enumerator for reassignment.
+     * @param subtaskId The id of the subtask to which the returned splits belong.
+     */
+    void addSplitsBack(List<SplitT> splits, int subtaskId);
+
+    void handleSplitRequest(int subtaskId);
 
     void registerReader(int subtaskId);
 
@@ -50,24 +68,20 @@ public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> exten
 
         /**
          * Assign the splits.
-         *
-         * @param newSplitAssignments the new split assignments to add.
          */
-        void assignSplits(Map<Integer, List<SplitT>> newSplitAssignments);
+        void assignSplit(int subtaskId, List<SplitT> splits);
 
         /**
          * Assigns a single split.
          *
          * <p>When assigning multiple splits, it is more efficient to assign all of them in a single
-         * call to the {@link #assignSplits} method.
+         * call to the {@link #assignSplit} method.
          *
-         * @param split   The new split
-         * @param subtask The index of the operator's parallel subtask that shall receive the split.
+         * @param split     The new split
+         * @param subtaskId The index of the operator's parallel subtask that shall receive the split.
          */
-        default void assignSplit(SplitT split, int subtask) {
-            Map<Integer, List<SplitT>> splits = new HashMap<>(CollectionConstants.MAP_SIZE);
-            splits.put(subtask, Collections.singletonList(split));
-            assignSplits(splits);
+        default void assignSplit(int subtaskId, SplitT split) {
+            assignSplit(subtaskId, Collections.singletonList(split));
         }
 
         /**
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 348ba654..82e84333 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -48,6 +48,7 @@ public final class FactoryUtil {
             String factoryIdentifier) {
 
         try {
+
             final TableSourceFactory factory = discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier);
             List<Source> sources = new ArrayList<>(multipleTables.size());
             if (factory instanceof SupportMultipleTable) {
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
new file mode 100644
index 00000000..8c06e3c8
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import java.util.List;
+import java.util.Set;
+
+public class CoordinatedEnumeratorContext<SplitT extends SourceSplit> implements SourceSplitEnumerator.Context<SplitT> {
+
+    protected final CoordinatedSource<?, SplitT, ?> coordinatedSource;
+
+    public CoordinatedEnumeratorContext(CoordinatedSource<?, SplitT, ?> coordinatedSource) {
+        this.coordinatedSource = coordinatedSource;
+    }
+
+    @Override
+    public int currentParallelism() {
+        return 0;
+    }
+
+    @Override
+    public Set<Integer> registeredReaders() {
+        return null;
+    }
+
+    @Override
+    public void assignSplit(int subtaskId, List<SplitT> splits) {
+
+    }
+
+    @Override
+    public void signalNoMoreSplits(int subtaskId) {
+
+    }
+
+    @Override
+    public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+
+    }
+
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 2816df97..142bd8e3 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -17,6 +17,13 @@
 
 package org.apache.seatunnel.translation.source;
 
-public class CoordinatedSource {
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CoordinatedSource<T, SplitT extends SourceSplit, StateT> {
+    protected volatile Map<Integer, SourceReader<T, SplitT>> readerMap = new ConcurrentHashMap<>();
 
 }
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java
new file mode 100644
index 00000000..8076fe1b
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class ParallelEnumeratorContext<SplitT extends SourceSplit> implements SourceSplitEnumerator.Context<SplitT> {
+
+    protected final ParallelSource<?, SplitT, ?> parallelSource;
+    protected final Integer parallelism;
+    protected final Integer subtaskId;
+    protected volatile boolean running = false;
+
+    public ParallelEnumeratorContext(ParallelSource<?, SplitT, ?> parallelSource,
+                                     int parallelism,
+                                     int subtaskId) {
+        this.parallelSource = parallelSource;
+        this.parallelism = parallelism;
+        this.subtaskId = subtaskId;
+    }
+
+    @Override
+    public int currentParallelism() {
+        return running ? parallelism : 0;
+    }
+
+    @Override
+    public Set<Integer> registeredReaders() {
+        return running ? Collections.singleton(subtaskId) : Collections.emptySet();
+    }
+
+    public void register() {
+        running = true;
+    }
+
+    @Override
+    public void assignSplit(int subtaskId, List<SplitT> splits) {
+        if (this.subtaskId == subtaskId) {
+            parallelSource.addSplits(splits);
+        }
+    }
+
+    @Override
+    public void signalNoMoreSplits(int subtaskId) {
+        if (this.subtaskId == subtaskId) {
+            parallelSource.handleNoMoreSplits();
+        }
+    }
+
+    @Override
+    public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+        // TODO: exception
+        throw new RuntimeException("");
+    }
+
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
new file mode 100644
index 00000000..0535645f
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceReader;
+
+public class ParallelReaderContext implements SourceReader.Context {
+
+    protected final ParallelSource<?, ?, ?> parallelSource;
+    protected final Integer subtaskId;
+
+    public ParallelReaderContext(ParallelSource<?, ?, ?> parallelSource,
+                                 Integer subtaskId) {
+        this.parallelSource = parallelSource;
+        this.subtaskId = subtaskId;
+    }
+
+    @Override
+    public int getIndexOfSubtask() {
+        return subtaskId;
+    }
+
+    @Override
+    public void signalNoMoreElement() {
+        parallelSource.handleNoMoreElement();
+    }
+
+    @Override
+    public void sendSplitRequest() {
+        parallelSource.handleSplitRequest(subtaskId);
+    }
+
+    @Override
+    public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
+        // TODO: exception
+        throw new RuntimeException("");
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 975fbfa7..3a072902 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -17,5 +17,148 @@
 
 package org.apache.seatunnel.translation.source;
 
-public class ParallelSource {
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.state.CheckpointListener;
+import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements AutoCloseable, CheckpointListener {
+
+    protected final Source<T, SplitT, StateT> source;
+    protected final ParallelEnumeratorContext<SplitT> parallelEnumeratorContext;
+    protected final ParallelReaderContext readerContext;
+    protected final Integer subtaskId;
+    protected final Integer parallelism;
+
+    protected final Serializer<SplitT> splitSerializer;
+    protected final Serializer<StateT> enumeratorStateSerializer;
+
+    protected final List<SplitT> restoredSplitState;
+
+    protected transient volatile SourceSplitEnumerator<SplitT, StateT> splitEnumerator;
+    protected transient volatile SourceReader<T, SplitT> reader;
+    protected transient volatile ExecutorService executorService;
+
+    /**
+     * Flag indicating whether the consumer is still running.
+     */
+    private volatile boolean running = true;
+
+    public ParallelSource(Source<T, SplitT, StateT> source,
+                          List<byte[]> restoredState,
+                          int parallelism,
+                          int subtaskId) {
+        this.source = source;
+        this.subtaskId = subtaskId;
+        this.parallelism = parallelism;
+
+        this.splitSerializer = source.getSplitSerializer();
+        this.enumeratorStateSerializer = source.getEnumeratorStateSerializer();
+        this.parallelEnumeratorContext = new ParallelEnumeratorContext<>(this, parallelism, subtaskId);
+        this.readerContext = new ParallelReaderContext(this, subtaskId);
+
+        // Create or restore split enumerator & reader
+        try {
+            if (restoredState != null && restoredState.size() > 0) {
+                StateT restoredEnumeratorState = enumeratorStateSerializer.deserialize(restoredState.get(0));
+                restoredSplitState = new ArrayList<>(restoredState.size());
+                for (int i = 1; i < restoredState.size(); i++) {
+                    restoredSplitState.add(splitSerializer.deserialize(restoredState.get(i)));
+                }
+
+                splitEnumerator = source.restoreEnumerator(parallelEnumeratorContext, restoredEnumeratorState);
+            } else {
+                restoredSplitState = Collections.emptyList();
+                splitEnumerator = source.createEnumerator(parallelEnumeratorContext);
+            }
+            reader = source.createReader(readerContext);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private transient volatile Thread splitEnumeratorThread;
+
+    public void open() throws Exception {
+        executorService = ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1, String.format("parallel-split-enumerator-executor-%s", subtaskId));
+        splitEnumerator.open();
+        splitEnumerator.addSplitsBack(restoredSplitState, subtaskId);
+        reader.open();
+        parallelEnumeratorContext.register();
+        splitEnumerator.registerReader(subtaskId);
+    }
+
+    public void run(Collector<T> collector) throws Exception {
+        executorService.execute(() -> splitEnumerator.run());
+        while (running) {
+            reader.pollNext(collector);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        // set ourselves as not running;
+        // this would let the main discovery loop escape as soon as possible
+        running = false;
+        splitEnumerator.close();
+        reader.close();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Reader context methods
+    // --------------------------------------------------------------------------------------------
+
+    protected void handleNoMoreElement() {
+        running = false;
+    }
+
+    protected void handleSplitRequest(int subtaskId) {
+        splitEnumerator.handleSplitRequest(subtaskId);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Enumerator context methods
+    // --------------------------------------------------------------------------------------------
+
+    protected void addSplits(List<SplitT> splits) {
+        reader.addSplits(splits);
+    }
+
+    protected void handleNoMoreSplits() {
+        reader.handleNoMoreSplits();
+    }
+
+    public List<byte[]> snapshotState(long checkpointId) throws Exception {
+        StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
+        byte[] enumeratorStateBytes = enumeratorStateSerializer.serialize(enumeratorState);
+        List<SplitT> splitStates = reader.snapshotState(checkpointId);
+        final List<byte[]> rawValues = new ArrayList<>(splitStates.size() + 1);
+        rawValues.add(enumeratorStateBytes);
+        for (SplitT splitState : splitStates) {
+            rawValues.add(splitSerializer.serialize(splitState));
+        }
+        return rawValues;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        splitEnumerator.notifyCheckpointComplete(checkpointId);
+        reader.notifyCheckpointComplete(checkpointId);
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        splitEnumerator.notifyCheckpointAborted(checkpointId);
+        reader.notifyCheckpointAborted(checkpointId);
+    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
similarity index 55%
copy from seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
index 2816df97..cd5dffa6 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
@@ -15,8 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.translation.source;
+package org.apache.seatunnel.translation.util;
 
-public class CoordinatedSource {
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
 
+public class ThreadPoolExecutorFactory {
+    private ThreadPoolExecutorFactory() {
+    }
+
+    public static ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(int corePoolSize, String name) {
+        AtomicInteger cnt = new AtomicInteger(0);
+        return new ScheduledThreadPoolExecutor(corePoolSize, runnable -> {
+            Thread thread = new Thread(runnable);
+            thread.setName(name + "-" + cnt.incrementAndGet());
+            return thread;
+        });
+    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
new file mode 100644
index 00000000..20530f24
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.serialization;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+
+/**
+ * Operator chaining can avoid serialization and deserialization while the job is running. However,
+ * the Flink Job initializes to determine if it is a generic type. Disabling serialization of
+ * generic types causes an exception, even though the operator does not serialize at run time.
+ */
+public class KryoTypeInfo<T> extends GenericTypeInfo<T> {
+    private static final long serialVersionUID = -4367528355992922603L;
+
+    public KryoTypeInfo(Class<T> typeClass) {
+        super(typeClass);
+    }
+
+    @Override
+    public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+        return new KryoSerializer<T>(getTypeClass(), config);
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
similarity index 50%
copy from seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
index 2816df97..9c7af59a 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
@@ -15,8 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.translation.source;
+package org.apache.seatunnel.translation.flink.serialization;
 
-public class CoordinatedSource {
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
 
+/**
+ * Wrapped {@link Row}.
+ *
+ * <p>Keep the original table name for the Dispatcher to distribute to the corresponding
+ * data stream
+ */
+public class WrappedRow extends Tuple2<Row, String> {
+    private static final long serialVersionUID = -8325988931728734377L;
+
+    public WrappedRow() {
+        super();
+    }
+
+    public WrappedRow(Row row, String table) {
+        super(row, table);
+    }
+
+    public Row getRow() {
+        return this.f0;
+    }
+
+    public String getTable() {
+        return this.f1;
+    }
+
+    public void setRow(Row row) {
+        this.f0 = row;
+    }
+
+    public void setTable(String table) {
+        this.f1 = table;
+    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
new file mode 100644
index 00000000..a51fbc7a
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.flink.serialization.KryoTypeInfo;
+import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
+import org.apache.seatunnel.translation.source.ParallelSource;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SeaTunnelParallelSource extends RichParallelSourceFunction<WrappedRow>
+        implements CheckpointListener, ResultTypeQueryable<WrappedRow>, CheckpointedFunction {
+    private static final Logger LOG = LoggerFactory.getLogger(SeaTunnelParallelSource.class);
+    protected static final String PARALLEL_SOURCE_STATE_NAME = "parallel-source-states";
+
+    protected final Source<SeaTunnelRow, ?, ?> source;
+    protected volatile ParallelSource<SeaTunnelRow, ?, ?> parallelSource;
+
+    protected transient ListState<byte[]> sourceState;
+    private transient volatile List<byte[]> restoredState;
+
+    /**
+     * Flag indicating whether the consumer is still running.
+     */
+    private volatile boolean running = true;
+
+    public SeaTunnelParallelSource(Source<SeaTunnelRow, ?, ?> source) {
+        // TODO: Make sure the source is uncoordinated.
+        this.source = source;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        this.parallelSource = new ParallelSource<>(source,
+                restoredState,
+                getRuntimeContext().getNumberOfParallelSubtasks(),
+                getRuntimeContext().getIndexOfThisSubtask());
+        this.parallelSource.open();
+    }
+
+    @Override
+    public void run(SourceFunction.SourceContext<WrappedRow> sourceContext) throws Exception {
+        parallelSource.run(new WrappedRowCollector(sourceContext));
+    }
+
+    @Override
+    public void cancel() {
+        running = false;
+        try {
+            parallelSource.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        parallelSource.notifyCheckpointComplete(checkpointId);
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        parallelSource.notifyCheckpointAborted(checkpointId);
+    }
+
+    @Override
+    public TypeInformation<WrappedRow> getProducedType() {
+        return new KryoTypeInfo<>(WrappedRow.class);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext snapshotContext) throws Exception {
+        if (!running) {
+            LOG.debug("snapshotState() called on closed source");
+        } else {
+            sourceState.update(parallelSource.snapshotState(snapshotContext.getCheckpointId()));
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext initializeContext) throws Exception {
+        OperatorStateStore stateStore = initializeContext.getOperatorStateStore();
+        this.sourceState = stateStore.getListState(new ListStateDescriptor<>(PARALLEL_SOURCE_STATE_NAME, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+        if (initializeContext.isRestored()) {
+            restoredState = new ArrayList<>();
+            // populate actual holder for restored state
+            sourceState.get().forEach(restoredState::add);
+
+            LOG.info("Consumer subtask {} restored state", getRuntimeContext().getIndexOfThisSubtask());
+        } else {
+            LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());
+        }
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java
new file mode 100644
index 00000000..1d0ea225
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.flink.serialization.FlinkRowSerialization;
+import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.io.IOException;
+
+public class WrappedRowCollector implements Collector<SeaTunnelRow> {
+
+    protected final SourceFunction.SourceContext<WrappedRow> internalCollector;
+    protected final FlinkRowSerialization rowSerialization = new FlinkRowSerialization();
+
+    public WrappedRowCollector(SourceFunction.SourceContext<WrappedRow> internalCollector) {
+        this.internalCollector = internalCollector;
+    }
+
+    @Override
+    public void collect(SeaTunnelRow record) {
+        try {
+            internalCollector.collect(new WrappedRow(rowSerialization.serialize(record), ""));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/pom.xml b/seatunnel-translation/seatunnel-translation-spark/pom.xml
index 948729d0..cb3700b4 100644
--- a/seatunnel-translation/seatunnel-translation-spark/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-spark/pom.xml
@@ -26,6 +26,8 @@
     <artifactId>seatunnel-translation-spark</artifactId>
 
     <properties>
+        <spark.version>2.4.0</spark.version>
+        <spark.scope>provided</spark.scope>
     </properties>
 
     <dependencies>
@@ -34,11 +36,27 @@
             <artifactId>seatunnel-translation-base</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <!-- apache spark table -->
+
+        <!--spark-->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>${spark.scope}</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>${spark.scope}</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_${scala.binary.version}</artifactId>
-            <scope>provided</scope>
+            <version>${spark.version}</version>
+            <scope>${spark.scope}</scope>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ParallelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ParallelSourceSupport.java
new file mode 100644
index 00000000..79f6aff2
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ParallelSourceSupport.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.source;
+
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Optional;
+
+public class ParallelSourceSupport implements DataSourceV2, ReadSupport, MicroBatchReadSupport, ContinuousReadSupport, DataSourceRegister {
+
+    public static final String SEA_TUNNEL_SOURCE_NAME = "SeaTunnel";
+
+    @Override
+    public String shortName() {
+        return SEA_TUNNEL_SOURCE_NAME;
+    }
+
+    @Override
+    public DataSourceReader createReader(StructType schema, DataSourceOptions options) {
+        return ReadSupport.super.createReader(schema, options);
+    }
+
+    @Override
+    public DataSourceReader createReader(DataSourceOptions options) {
+        return null;
+    }
+
+    @Override
+    public MicroBatchReader createMicroBatchReader(Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
+        return null;
+    }
+
+    @Override
+    public ContinuousReader createContinuousReader(Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
+        return null;
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SparkState.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SparkState.java
new file mode 100644
index 00000000..6ccb2a66
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SparkState.java
@@ -0,0 +1,30 @@
+package org.apache.seatunnel.translation.spark.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.source.ParallelSource;
+
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+
+import java.util.List;
+
+public class SparkState<SplitT extends SourceSplit, StateT> extends Offset {
+
+    protected final ParallelSource<SeaTunnelRow, SplitT, StateT> parallelSource;
+    protected volatile Integer checkpointId;
+
+    public SparkState(ParallelSource<SeaTunnelRow, SplitT, StateT> parallelSource, int checkpointId) {
+        this.parallelSource = parallelSource;
+        this.checkpointId = checkpointId;
+    }
+
+    @Override
+    public String json() {
+        try {
+            List<byte[]> bytes = this.parallelSource.snapshotState(checkpointId);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return null;
+    }
+}