You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/10 17:16:27 UTC
[incubator-seatunnel] branch api-draft updated: [Api-Draft] SeaTunnel Source support Flink engine. (#1842)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 6b5136b7 [Api-Draft] SeaTunnel Source support Flink engine. (#1842)
6b5136b7 is described below
commit 6b5136b7b015754cd2e3aea2d8735a9f82f10df2
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Wed May 11 01:16:20 2022 +0800
[Api-Draft] SeaTunnel Source support Flink engine. (#1842)
Coordinated sources are not yet supported
---
.../org/apache/seatunnel/api/source/Source.java | 3 +
.../apache/seatunnel/api/source/SourceReader.java | 36 +++--
.../api/source/SourceSplitEnumerator.java | 44 ++++---
.../seatunnel/api/table/factory/FactoryUtil.java | 1 +
.../source/CoordinatedEnumeratorContext.java | 60 +++++++++
.../translation/source/CoordinatedSource.java | 9 +-
.../source/ParallelEnumeratorContext.java | 77 +++++++++++
.../translation/source/ParallelReaderContext.java | 54 ++++++++
.../translation/source/ParallelSource.java | 145 ++++++++++++++++++++-
.../ThreadPoolExecutorFactory.java} | 17 ++-
.../flink/serialization/KryoTypeInfo.java | 41 ++++++
.../flink/serialization/WrappedRow.java} | 37 +++++-
.../flink/source/SeaTunnelParallelSource.java | 130 ++++++++++++++++++
.../flink/source/WrappedRowCollector.java | 46 +++++++
.../seatunnel-translation-spark/pom.xml | 24 +++-
.../spark/source/ParallelSourceSupport.java | 61 +++++++++
.../translation/spark/source/SparkState.java | 30 +++++
17 files changed, 782 insertions(+), 33 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
index 80cf9aaf..7d77f0d7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
@@ -39,9 +39,12 @@ public interface Source<T, SplitT extends SourceSplit, StateT> extends Serializa
SourceReader<T, SplitT> createReader(SourceReader.Context readerContext) throws Exception;
+ Serializer<SplitT> getSplitSerializer();
+
SourceSplitEnumerator<SplitT, StateT> createEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext) throws Exception;
SourceSplitEnumerator<SplitT, StateT> restoreEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext, StateT checkpointState) throws Exception;
Serializer<StateT> getEnumeratorStateSerializer();
+
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
index dff0527d..8131e835 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
@@ -19,35 +19,53 @@ package org.apache.seatunnel.api.source;
import org.apache.seatunnel.api.state.CheckpointListener;
+import java.io.IOException;
import java.util.List;
-import java.util.Map;
-public interface SourceReader<T, SplitT extends SourceSplit> extends CheckpointListener {
+public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable, CheckpointListener {
- void start(Collector<T> output) throws Exception;
+ void open();
- List<SplitT> snapshotState(long checkpointId);
+ /**
+ * Called to close the reader, in case it holds on to any resources, like threads or network
+ * connections.
+ */
+ @Override
+ void close() throws IOException;
+
+ void pollNext(Collector<T> output) throws Exception;
+
+ List<SplitT> snapshotState(long checkpointId) throws Exception;
void addSplits(List<SplitT> splits);
+ /**
+ * This method is called when the reader is notified that it will not receive any further
+ * splits.
+ *
+ * <p>It is triggered when the enumerator calls {@link
+ * SourceSplitEnumerator.Context#signalNoMoreSplits(int)} with the reader's parallel subtask.
+ */
+ void handleNoMoreSplits();
+
default void handleSourceEvent(SourceEvent sourceEvent) {
}
interface Context {
/**
- * Gets the configuration with which Flink was started.
+ * @return The index of this subtask.
*/
- Map<String, String> getConfiguration();
+ int getIndexOfSubtask();
/**
- * @return The index of this subtask.
+ * Indicator that the input has reached the end of data.
*/
- int getIndexOfSubtask();
+ void signalNoMoreElement();
/**
* Sends a split request to the source's {@link SourceSplitEnumerator}. This will result in a call to
- * the {@link SourceSplitEnumerator#handleSplitRequest(int, String)} method, with this reader's
+ * the {@link SourceSplitEnumerator#handleSplitRequest(int)} method, with this reader's
* parallel subtask id and the hostname where this reader runs.
*/
void sendSplitRequest();
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
index 68c2b694..77e75c8f 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -18,17 +18,35 @@
package org.apache.seatunnel.api.source;
import org.apache.seatunnel.api.state.CheckpointListener;
-import org.apache.seatunnel.common.constants.CollectionConstants;
+import java.io.IOException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> extends CheckpointListener {
+public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> extends AutoCloseable, CheckpointListener {
- void handleSplitRequest(int subtaskId, String requesterHostname);
+ void open();
+
+ void run();
+
+ /**
+ * Called to close the enumerator, in case it holds on to any resources, like threads or network
+ * connections.
+ */
+ @Override
+ void close() throws IOException;
+
+ /**
+ * Add a split back to the split enumerator. It will only happen when a {@link SourceReader}
+ * fails and there are splits assigned to it after the last successful checkpoint.
+ *
+ * @param splits The split to add back to the enumerator for reassignment.
+ * @param subtaskId The id of the subtask to which the returned splits belong.
+ */
+ void addSplitsBack(List<SplitT> splits, int subtaskId);
+
+ void handleSplitRequest(int subtaskId);
void registerReader(int subtaskId);
@@ -50,24 +68,20 @@ public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> exten
/**
* Assign the splits.
- *
- * @param newSplitAssignments the new split assignments to add.
*/
- void assignSplits(Map<Integer, List<SplitT>> newSplitAssignments);
+ void assignSplit(int subtaskId, List<SplitT> splits);
/**
* Assigns a single split.
*
* <p>When assigning multiple splits, it is more efficient to assign all of them in a single
- * call to the {@link #assignSplits} method.
+ * call to the {@link #assignSplit} method.
*
- * @param split The new split
- * @param subtask The index of the operator's parallel subtask that shall receive the split.
+ * @param split The new split
+ * @param subtaskId The index of the operator's parallel subtask that shall receive the split.
*/
- default void assignSplit(SplitT split, int subtask) {
- Map<Integer, List<SplitT>> splits = new HashMap<>(CollectionConstants.MAP_SIZE);
- splits.put(subtask, Collections.singletonList(split));
- assignSplits(splits);
+ default void assignSplit(int subtaskId, SplitT split) {
+ assignSplit(subtaskId, Collections.singletonList(split));
}
/**
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 348ba654..82e84333 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -48,6 +48,7 @@ public final class FactoryUtil {
String factoryIdentifier) {
try {
+
final TableSourceFactory factory = discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier);
List<Source> sources = new ArrayList<>(multipleTables.size());
if (factory instanceof SupportMultipleTable) {
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
new file mode 100644
index 00000000..8c06e3c8
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedEnumeratorContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import java.util.List;
+import java.util.Set;
+
+public class CoordinatedEnumeratorContext<SplitT extends SourceSplit> implements SourceSplitEnumerator.Context<SplitT> {
+
+ protected final CoordinatedSource<?, SplitT, ?> coordinatedSource;
+
+ public CoordinatedEnumeratorContext(CoordinatedSource<?, SplitT, ?> coordinatedSource) {
+ this.coordinatedSource = coordinatedSource;
+ }
+
+ @Override
+ public int currentParallelism() {
+ return 0;
+ }
+
+ @Override
+ public Set<Integer> registeredReaders() {
+ return null;
+ }
+
+ @Override
+ public void assignSplit(int subtaskId, List<SplitT> splits) {
+
+ }
+
+ @Override
+ public void signalNoMoreSplits(int subtaskId) {
+
+ }
+
+ @Override
+ public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+
+ }
+
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 2816df97..142bd8e3 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -17,6 +17,13 @@
package org.apache.seatunnel.translation.source;
-public class CoordinatedSource {
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CoordinatedSource<T, SplitT extends SourceSplit, StateT> {
+ protected volatile Map<Integer, SourceReader<T, SplitT>> readerMap = new ConcurrentHashMap<>();
}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java
new file mode 100644
index 00000000..8076fe1b
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelEnumeratorContext.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class ParallelEnumeratorContext<SplitT extends SourceSplit> implements SourceSplitEnumerator.Context<SplitT> {
+
+ protected final ParallelSource<?, SplitT, ?> parallelSource;
+ protected final Integer parallelism;
+ protected final Integer subtaskId;
+ protected volatile boolean running = false;
+
+ public ParallelEnumeratorContext(ParallelSource<?, SplitT, ?> parallelSource,
+ int parallelism,
+ int subtaskId) {
+ this.parallelSource = parallelSource;
+ this.parallelism = parallelism;
+ this.subtaskId = subtaskId;
+ }
+
+ @Override
+ public int currentParallelism() {
+ return running ? parallelism : 0;
+ }
+
+ @Override
+ public Set<Integer> registeredReaders() {
+ return running ? Collections.singleton(subtaskId) : Collections.emptySet();
+ }
+
+ public void register() {
+ running = true;
+ }
+
+ @Override
+ public void assignSplit(int subtaskId, List<SplitT> splits) {
+ if (this.subtaskId == subtaskId) {
+ parallelSource.addSplits(splits);
+ }
+ }
+
+ @Override
+ public void signalNoMoreSplits(int subtaskId) {
+ if (this.subtaskId == subtaskId) {
+ parallelSource.handleNoMoreSplits();
+ }
+ }
+
+ @Override
+ public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+ // TODO: exception
+ throw new RuntimeException("");
+ }
+
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
new file mode 100644
index 00000000..0535645f
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelReaderContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.source;
+
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceReader;
+
+public class ParallelReaderContext implements SourceReader.Context {
+
+ protected final ParallelSource<?, ?, ?> parallelSource;
+ protected final Integer subtaskId;
+
+ public ParallelReaderContext(ParallelSource<?, ?, ?> parallelSource,
+ Integer subtaskId) {
+ this.parallelSource = parallelSource;
+ this.subtaskId = subtaskId;
+ }
+
+ @Override
+ public int getIndexOfSubtask() {
+ return subtaskId;
+ }
+
+ @Override
+ public void signalNoMoreElement() {
+ parallelSource.handleNoMoreElement();
+ }
+
+ @Override
+ public void sendSplitRequest() {
+ parallelSource.handleSplitRequest(subtaskId);
+ }
+
+ @Override
+ public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
+ // TODO: exception
+ throw new RuntimeException("");
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 975fbfa7..3a072902 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -17,5 +17,148 @@
package org.apache.seatunnel.translation.source;
-public class ParallelSource {
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.state.CheckpointListener;
+import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements AutoCloseable, CheckpointListener {
+
+ protected final Source<T, SplitT, StateT> source;
+ protected final ParallelEnumeratorContext<SplitT> parallelEnumeratorContext;
+ protected final ParallelReaderContext readerContext;
+ protected final Integer subtaskId;
+ protected final Integer parallelism;
+
+ protected final Serializer<SplitT> splitSerializer;
+ protected final Serializer<StateT> enumeratorStateSerializer;
+
+ protected final List<SplitT> restoredSplitState;
+
+ protected transient volatile SourceSplitEnumerator<SplitT, StateT> splitEnumerator;
+ protected transient volatile SourceReader<T, SplitT> reader;
+ protected transient volatile ExecutorService executorService;
+
+ /**
+ * Flag indicating whether the consumer is still running.
+ */
+ private volatile boolean running = true;
+
+ public ParallelSource(Source<T, SplitT, StateT> source,
+ List<byte[]> restoredState,
+ int parallelism,
+ int subtaskId) {
+ this.source = source;
+ this.subtaskId = subtaskId;
+ this.parallelism = parallelism;
+
+ this.splitSerializer = source.getSplitSerializer();
+ this.enumeratorStateSerializer = source.getEnumeratorStateSerializer();
+ this.parallelEnumeratorContext = new ParallelEnumeratorContext<>(this, parallelism, subtaskId);
+ this.readerContext = new ParallelReaderContext(this, subtaskId);
+
+ // Create or restore split enumerator & reader
+ try {
+ if (restoredState != null && restoredState.size() > 0) {
+ StateT restoredEnumeratorState = enumeratorStateSerializer.deserialize(restoredState.get(0));
+ restoredSplitState = new ArrayList<>(restoredState.size());
+ for (int i = 1; i < restoredState.size(); i++) {
+ restoredSplitState.add(splitSerializer.deserialize(restoredState.get(i)));
+ }
+
+ splitEnumerator = source.restoreEnumerator(parallelEnumeratorContext, restoredEnumeratorState);
+ } else {
+ restoredSplitState = Collections.emptyList();
+ splitEnumerator = source.createEnumerator(parallelEnumeratorContext);
+ }
+ reader = source.createReader(readerContext);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private transient volatile Thread splitEnumeratorThread;
+
+ public void open() throws Exception {
+ executorService = ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1, String.format("parallel-split-enumerator-executor-%s", subtaskId));
+ splitEnumerator.open();
+ splitEnumerator.addSplitsBack(restoredSplitState, subtaskId);
+ reader.open();
+ parallelEnumeratorContext.register();
+ splitEnumerator.registerReader(subtaskId);
+ }
+
+ public void run(Collector<T> collector) throws Exception {
+ executorService.execute(() -> splitEnumerator.run());
+ while (running) {
+ reader.pollNext(collector);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // set ourselves as not running;
+ // this would let the main discovery loop escape as soon as possible
+ running = false;
+ splitEnumerator.close();
+ reader.close();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Reader context methods
+ // --------------------------------------------------------------------------------------------
+
+ protected void handleNoMoreElement() {
+ running = false;
+ }
+
+ protected void handleSplitRequest(int subtaskId) {
+ splitEnumerator.handleSplitRequest(subtaskId);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Enumerator context methods
+ // --------------------------------------------------------------------------------------------
+
+ protected void addSplits(List<SplitT> splits) {
+ reader.addSplits(splits);
+ }
+
+ protected void handleNoMoreSplits() {
+ reader.handleNoMoreSplits();
+ }
+
+ public List<byte[]> snapshotState(long checkpointId) throws Exception {
+ StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
+ byte[] enumeratorStateBytes = enumeratorStateSerializer.serialize(enumeratorState);
+ List<SplitT> splitStates = reader.snapshotState(checkpointId);
+ final List<byte[]> rawValues = new ArrayList<>(splitStates.size() + 1);
+ rawValues.add(enumeratorStateBytes);
+ for (SplitT splitState : splitStates) {
+ rawValues.add(splitSerializer.serialize(splitState));
+ }
+ return rawValues;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ splitEnumerator.notifyCheckpointComplete(checkpointId);
+ reader.notifyCheckpointComplete(checkpointId);
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ splitEnumerator.notifyCheckpointAborted(checkpointId);
+ reader.notifyCheckpointAborted(checkpointId);
+ }
}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
similarity index 55%
copy from seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
index 2816df97..cd5dffa6 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/util/ThreadPoolExecutorFactory.java
@@ -15,8 +15,21 @@
* limitations under the License.
*/
-package org.apache.seatunnel.translation.source;
+package org.apache.seatunnel.translation.util;
-public class CoordinatedSource {
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+public class ThreadPoolExecutorFactory {
+ private ThreadPoolExecutorFactory() {
+ }
+
+ public static ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(int corePoolSize, String name) {
+ AtomicInteger cnt = new AtomicInteger(0);
+ return new ScheduledThreadPoolExecutor(corePoolSize, runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName(name + "-" + cnt.incrementAndGet());
+ return thread;
+ });
+ }
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
new file mode 100644
index 00000000..20530f24
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.serialization;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+
+/**
+ * Operator chaining can avoid serialization and deserialization while the job is running. However,
+ * the Flink Job initializes to determine if it is a generic type. Disabling serialization of
+ * generic types causes an exception, even though the operator does not serialize at run time.
+ */
+public class KryoTypeInfo<T> extends GenericTypeInfo<T> {
+ private static final long serialVersionUID = -4367528355992922603L;
+
+ public KryoTypeInfo(Class<T> typeClass) {
+ super(typeClass);
+ }
+
+ @Override
+ public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+ return new KryoSerializer<T>(getTypeClass(), config);
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
similarity index 50%
copy from seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
copy to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
index 2816df97..9c7af59a 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
@@ -15,8 +15,41 @@
* limitations under the License.
*/
-package org.apache.seatunnel.translation.source;
+package org.apache.seatunnel.translation.flink.serialization;
-public class CoordinatedSource {
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.Row;
+/**
+ * Wrapped {@link Row}.
+ *
+ * <p>Keep the original table name for the Dispatcher to distribute to the corresponding
+ * data stream
+ */
+public class WrappedRow extends Tuple2<Row, String> {
+ private static final long serialVersionUID = -8325988931728734377L;
+
+ public WrappedRow() {
+ super();
+ }
+
+ public WrappedRow(Row row, String table) {
+ super(row, table);
+ }
+
+ public Row getRow() {
+ return this.f0;
+ }
+
+ public String getTable() {
+ return this.f1;
+ }
+
+ public void setRow(Row row) {
+ this.f0 = row;
+ }
+
+ public void setTable(String table) {
+ this.f1 = table;
+ }
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
new file mode 100644
index 00000000..a51fbc7a
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.flink.serialization.KryoTypeInfo;
+import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
+import org.apache.seatunnel.translation.source.ParallelSource;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SeaTunnelParallelSource extends RichParallelSourceFunction<WrappedRow>
+ implements CheckpointListener, ResultTypeQueryable<WrappedRow>, CheckpointedFunction {
+ private static final Logger LOG = LoggerFactory.getLogger(SeaTunnelParallelSource.class);
+ protected static final String PARALLEL_SOURCE_STATE_NAME = "parallel-source-states";
+
+ protected final Source<SeaTunnelRow, ?, ?> source;
+ protected volatile ParallelSource<SeaTunnelRow, ?, ?> parallelSource;
+
+ protected transient ListState<byte[]> sourceState;
+ private transient volatile List<byte[]> restoredState;
+
+ /**
+ * Flag indicating whether the consumer is still running.
+ */
+ private volatile boolean running = true;
+
+ public SeaTunnelParallelSource(Source<SeaTunnelRow, ?, ?> source) {
+ // TODO: Make sure the source is uncoordinated.
+ this.source = source;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.parallelSource = new ParallelSource<>(source,
+ restoredState,
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ getRuntimeContext().getIndexOfThisSubtask());
+ this.parallelSource.open();
+ }
+
+ @Override
+ public void run(SourceFunction.SourceContext<WrappedRow> sourceContext) throws Exception {
+ parallelSource.run(new WrappedRowCollector(sourceContext));
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ try {
+ parallelSource.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ parallelSource.notifyCheckpointComplete(checkpointId);
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ parallelSource.notifyCheckpointAborted(checkpointId);
+ }
+
+ @Override
+ public TypeInformation<WrappedRow> getProducedType() {
+ return new KryoTypeInfo<>(WrappedRow.class);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext snapshotContext) throws Exception {
+ if (!running) {
+ LOG.debug("snapshotState() called on closed source");
+ } else {
+ sourceState.update(parallelSource.snapshotState(snapshotContext.getCheckpointId()));
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext initializeContext) throws Exception {
+ OperatorStateStore stateStore = initializeContext.getOperatorStateStore();
+ this.sourceState = stateStore.getListState(new ListStateDescriptor<>(PARALLEL_SOURCE_STATE_NAME, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
+ if (initializeContext.isRestored()) {
+ restoredState = new ArrayList<>();
+ // populate actual holder for restored state
+ sourceState.get().forEach(restoredState::add);
+
+ LOG.info("Consumer subtask {} restored state", getRuntimeContext().getIndexOfThisSubtask());
+ } else {
+ LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java
new file mode 100644
index 00000000..1d0ea225
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/WrappedRowCollector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.flink.serialization.FlinkRowSerialization;
+import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.io.IOException;
+
+public class WrappedRowCollector implements Collector<SeaTunnelRow> {
+
+ protected final SourceFunction.SourceContext<WrappedRow> internalCollector;
+ protected final FlinkRowSerialization rowSerialization = new FlinkRowSerialization();
+
+ public WrappedRowCollector(SourceFunction.SourceContext<WrappedRow> internalCollector) {
+ this.internalCollector = internalCollector;
+ }
+
+ @Override
+ public void collect(SeaTunnelRow record) {
+ try {
+ internalCollector.collect(new WrappedRow(rowSerialization.serialize(record), ""));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/pom.xml b/seatunnel-translation/seatunnel-translation-spark/pom.xml
index 948729d0..cb3700b4 100644
--- a/seatunnel-translation/seatunnel-translation-spark/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-spark/pom.xml
@@ -26,6 +26,8 @@
<artifactId>seatunnel-translation-spark</artifactId>
<properties>
+ <spark.version>2.4.0</spark.version>
+ <spark.scope>provided</spark.scope>
</properties>
<dependencies>
@@ -34,11 +36,27 @@
<artifactId>seatunnel-translation-base</artifactId>
<version>${project.version}</version>
</dependency>
- <!-- apache spark table -->
+
+ <!--spark-->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>${spark.scope}</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>${spark.scope}</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
- <scope>provided</scope>
+ <version>${spark.version}</version>
+ <scope>${spark.scope}</scope>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ParallelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ParallelSourceSupport.java
new file mode 100644
index 00000000..79f6aff2
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/ParallelSourceSupport.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.spark.source;
+
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.MicroBatchReadSupport;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Optional;
+
+public class ParallelSourceSupport implements DataSourceV2, ReadSupport, MicroBatchReadSupport, ContinuousReadSupport, DataSourceRegister {
+
+ public static final String SEA_TUNNEL_SOURCE_NAME = "SeaTunnel";
+
+ @Override
+ public String shortName() {
+ return SEA_TUNNEL_SOURCE_NAME;
+ }
+
+ @Override
+ public DataSourceReader createReader(StructType schema, DataSourceOptions options) {
+ return ReadSupport.super.createReader(schema, options);
+ }
+
+ @Override
+ public DataSourceReader createReader(DataSourceOptions options) {
+ return null;
+ }
+
+ @Override
+ public MicroBatchReader createMicroBatchReader(Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
+ return null;
+ }
+
+ @Override
+ public ContinuousReader createContinuousReader(Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) {
+ return null;
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SparkState.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SparkState.java
new file mode 100644
index 00000000..6ccb2a66
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SparkState.java
@@ -0,0 +1,30 @@
+package org.apache.seatunnel.translation.spark.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.translation.source.ParallelSource;
+
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+
+import java.util.List;
+
+public class SparkState<SplitT extends SourceSplit, StateT> extends Offset {
+
+ protected final ParallelSource<SeaTunnelRow, SplitT, StateT> parallelSource;
+ protected volatile Integer checkpointId;
+
+ public SparkState(ParallelSource<SeaTunnelRow, SplitT, StateT> parallelSource, int checkpointId) {
+ this.parallelSource = parallelSource;
+ this.checkpointId = checkpointId;
+ }
+
+ @Override
+ public String json() {
+ try {
+ List<byte[]> bytes = this.parallelSource.snapshotState(checkpointId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+}