You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/28 06:40:31 UTC
[incubator-seatunnel] 02/02: [Feature][core] base logic
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
commit 5eb77fc3050bf6306f25843aba6a2c3020e014fa
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Thu Apr 28 10:29:44 2022 +0800
[Feature][core] base logic
---
pom.xml | 1 +
.../Serializer.java} | 26 ++--
.../java/org/apache/seatunnel/api/sink/Sink.java | 32 ++++-
.../api/sink/SinkAggregatedCommitter.java | 10 +-
.../apache/seatunnel/api/sink/SinkCommitter.java | 11 +-
.../org/apache/seatunnel/api/sink/SinkWriter.java | 44 +++++-
.../org/apache/seatunnel/api/source/Collector.java | 5 +-
.../org/apache/seatunnel/api/source/Source.java | 18 ++-
.../apache/seatunnel/api/source/SourceEvent.java | 7 +-
.../apache/seatunnel/api/source/SourceReader.java | 41 +++++-
.../apache/seatunnel/api/source/SourceSplit.java | 4 +-
.../api/source/SourceSplitEnumerator.java | 68 +++++++++-
.../{Collector.java => SupportCoordinate.java} | 5 +-
.../seatunnel/api/state/CheckpointListener.java | 5 +
.../seatunnel/api/state/StatefulOperator.java | 21 ---
.../apache/seatunnel/api/table/catalog/Column.java | 37 +++--
.../api/table/catalog/TableIdentifier.java | 2 +-
.../seatunnel/api/table/catalog/TableSchema.java | 4 +-
.../table/catalog/exception/CatalogException.java | 14 +-
.../table/connector/SupportReadingMetadata.java | 1 -
.../seatunnel/api/table/connector/TableSink.java | 2 +-
.../seatunnel/api/table/connector/TableSource.java | 2 +-
.../api/table/factory/CatalogFactory.java | 11 +-
.../factory/{CatalogFactory.java => Factory.java} | 11 +-
...TableFactoryUtil.java => FactoryException.java} | 9 +-
.../seatunnel/api/table/factory/FactoryUtil.java | 150 +++++++++++++++++++++
.../api/table/factory/SupportMultipleTable.java | 2 +-
...{TableFactory.java => TableFactoryContext.java} | 51 ++++---
.../api/table/factory/TableSinkFactory.java | 4 +-
.../api/table/factory/TableSourceFactory.java | 4 +-
seatunnel-translation/pom.xml | 21 +++
.../seatunnel-translation-base/pom.xml | 26 ++++
.../serialization/RowSerialization.java | 26 ++--
.../translation/source/CoordinatedSource.java | 5 +-
.../translation/source/ParallelSource.java | 4 +-
.../seatunnel-translation-flink/pom.xml | 32 +++++
.../flink/serialization/FlinkRowSerialization.java | 19 +++
.../seatunnel-translation-spark/pom.xml | 24 ++++
38 files changed, 630 insertions(+), 129 deletions(-)
diff --git a/pom.xml b/pom.xml
index 5804f998..ed98a286 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,7 @@
<module>seatunnel-examples</module>
<module>seatunnel-e2e</module>
<module>seatunnel-api</module>
+ <module>seatunnel-translation</module>
</modules>
<properties>
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
similarity index 57%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
index 9742f1df..61703486 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
@@ -15,25 +15,27 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.api.serialization;
-import org.apache.seatunnel.api.table.catalog.Catalog;
+import java.io.IOException;
-import java.util.Map;
-
-public interface CatalogFactory {
+public interface Serializer<T> {
/**
- * Returns a unique identifier among same factory interfaces.
+ * Serializes the given object.
*
- * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
+ * @param obj The object to serialize.
+ * @return The serialized data (bytes).
+ * @throws IOException Thrown, if the serialization fails.
*/
- String factoryIdentifier();
+ byte[] serialize(T obj) throws IOException;
/**
- * Creates a {@link Catalog} using the options.
+ * De-serializes the given data (bytes).
+ *
+ * @param serialized The serialized data
+ * @return The deserialized object
+ * @throws IOException Thrown, if the deserialization fails.
*/
- Catalog createCatalog(String catalogName, Map<String, String> options);
+ T deserialize(byte[] serialized) throws IOException;
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
index 9e1d73e6..2c1fedef 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
@@ -17,8 +17,38 @@
package org.apache.seatunnel.api.sink;
+import org.apache.seatunnel.api.serialization.Serializer;
+
+import java.io.IOException;
import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+public interface Sink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable {
+
+ SinkWriter<IN, StateT> createWriter(SinkWriter.Context context) throws IOException;
+
+ default SinkWriter<IN, StateT> restoreWriter(SinkWriter.Context context, List<StateT> states) throws IOException {
+ return createWriter(context);
+ }
+
+ default Optional<Serializer<StateT>> getWriterStateSerializer() {
+ return Optional.empty();
+ }
+
+ default Optional<SinkCommitter<CommitInfoT>> createCommitter() throws IOException {
+ return Optional.empty();
+ }
+
+ default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() {
+ return Optional.empty();
+ }
-public interface Sink<IN> extends Serializable {
+ default Optional<SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>> createAggregatedCommitter() throws IOException {
+ return Optional.empty();
+ }
+ default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() {
+ return Optional.empty();
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index e4947fcb..1f38efa3 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -17,5 +17,13 @@
package org.apache.seatunnel.api.sink;
-public interface SinkAggregatedCommitter {
+import java.io.IOException;
+import java.util.List;
+
+public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+
+ List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
+ throws IOException, InterruptedException;
+
+ void abort() throws Exception;
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
index 176cca65..2dff2525 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -17,5 +17,14 @@
package org.apache.seatunnel.api.sink;
-public interface SinkCommitter {
+import java.io.IOException;
+import java.util.List;
+
+public interface SinkCommitter<CommitInfoT> {
+
+ List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
+
+ List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+
+ void abort() throws Exception;
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 563d9fa0..933bfeb0 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -17,5 +17,47 @@
package org.apache.seatunnel.api.sink;
-public interface SinkWriter {
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface SinkWriter<T, StateT> {
+ void write(T element) throws IOException, InterruptedException;
+
+ /**
+ * @return The writer's state.
+ * @throws IOException if fail to snapshot writer's state.
+ * @deprecated implement {@link #snapshotState(long)}
+ */
+ default List<StateT> snapshotState() throws IOException {
+ return Collections.emptyList();
+ }
+
+ /**
+ * @return The writer's state.
+ * @throws IOException if fail to snapshot writer's state.
+ */
+ default List<StateT> snapshotState(long checkpointId) throws IOException {
+ return snapshotState();
+ }
+
+ interface Context {
+
+ /**
+ * Gets the configuration with which Flink was started.
+ */
+ Map<String, String> getConfiguration();
+
+ /**
+ * @return The index of this subtask.
+ */
+ int getIndexOfSubtask();
+
+ /**
+ * @return The number of parallel Sink tasks.
+ */
+ int getNumberOfParallelSubtasks();
+
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
index 5a84fbf7..f36a7f87 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
@@ -17,5 +17,8 @@
package org.apache.seatunnel.api.source;
-public interface Collector {
+public interface Collector<T> {
+
+ void collect(T record);
+
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
index 6dd2e74a..80cf9aaf 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
@@ -17,17 +17,31 @@
package org.apache.seatunnel.api.source;
+import org.apache.seatunnel.api.serialization.Serializer;
+
import java.io.Serializable;
/**
* The interface for Source. It acts like a factory class that helps construct the {@link
* SourceSplitEnumerator} and {@link SourceReader} and corresponding serializers.
*
- * @param <T> The type of records produced by the source.
+ * @param <T> The type of records produced by the source.
* @param <SplitT> The type of splits handled by the source.
- * @param <StateT> The type of state to store.
*/
public interface Source<T, SplitT extends SourceSplit, StateT> extends Serializable {
+ /**
+ * Get the boundedness of this source.
+ *
+ * @return the boundedness of this source.
+ */
+ Boundedness getBoundedness();
+
+ SourceReader<T, SplitT> createReader(SourceReader.Context readerContext) throws Exception;
+
+ SourceSplitEnumerator<SplitT, StateT> createEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext) throws Exception;
+
+ SourceSplitEnumerator<SplitT, StateT> restoreEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext, StateT checkpointState) throws Exception;
+ Serializer<StateT> getEnumeratorStateSerializer();
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
index e1255b95..4d2374b4 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
@@ -17,5 +17,10 @@
package org.apache.seatunnel.api.source;
-public interface SourceEvent {
+import java.io.Serializable;
+
+/**
+ * An base class for the events passed between the SourceReaders and Enumerators.
+ */
+public interface SourceEvent extends Serializable {
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
index 7e784f17..dff0527d 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
@@ -17,9 +17,46 @@
package org.apache.seatunnel.api.source;
-public interface SourceReader {
+import org.apache.seatunnel.api.state.CheckpointListener;
- interface SupportCoordinate {
+import java.util.List;
+import java.util.Map;
+public interface SourceReader<T, SplitT extends SourceSplit> extends CheckpointListener {
+
+ void start(Collector<T> output) throws Exception;
+
+ List<SplitT> snapshotState(long checkpointId);
+
+ void addSplits(List<SplitT> splits);
+
+ default void handleSourceEvent(SourceEvent sourceEvent) {
+ }
+
+ interface Context {
+
+ /**
+ * Gets the configuration with which Flink was started.
+ */
+ Map<String, String> getConfiguration();
+
+ /**
+ * @return The index of this subtask.
+ */
+ int getIndexOfSubtask();
+
+ /**
+ * Sends a split request to the source's {@link SourceSplitEnumerator}. This will result in a call to
+ * the {@link SourceSplitEnumerator#handleSplitRequest(int, String)} method, with this reader's
+ * parallel subtask id and the hostname where this reader runs.
+ */
+ void sendSplitRequest();
+
+ /**
+ * Send a source event to the source coordinator.
+ *
+ * @param sourceEvent the source event to coordinator.
+ */
+ void sendSourceEventToCoordinator(SourceEvent sourceEvent);
}
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java
index e7b6b9b1..af6e1acf 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.api.source;
-/** An interface for all the Split types to extend. */
+/**
+ * An interface for all the Split types to extend.
+ */
public interface SourceSplit {
/**
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
index 0dff0b1d..1c2291fa 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -17,9 +17,73 @@
package org.apache.seatunnel.api.source;
-public interface SourceSplitEnumerator {
+import org.apache.seatunnel.api.state.CheckpointListener;
- interface SupportCoordinate {
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> extends CheckpointListener {
+
+ void handleSplitRequest(int subtaskId, String requesterHostname);
+
+ void registerReader(int subtaskId);
+
+ StateT snapshotState(long checkpointId) throws Exception;
+
+ default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
}
+
+ interface Context<SplitT extends SourceSplit> {
+
+ int currentParallelism();
+
+ /**
+ * Get the currently registered readers. The mapping is from subtask id to the reader info.
+ *
+ * @return the currently registered readers.
+ */
+ Set<Integer> registeredReaders();
+
+ /**
+ * Assign the splits.
+ *
+ * @param newSplitAssignments the new split assignments to add.
+ */
+ void assignSplits(Map<Integer, List<SplitT>> newSplitAssignments);
+
+ /**
+ * Assigns a single split.
+ *
+ * <p>When assigning multiple splits, it is more efficient to assign all of them in a single
+ * call to the {@link #assignSplits} method.
+ *
+ * @param split The new split
+ * @param subtask The index of the operator's parallel subtask that shall receive the split.
+ */
+ default void assignSplit(SplitT split, int subtask) {
+ Map<Integer, List<SplitT>> splits = new HashMap<>();
+ splits.put(subtask, Collections.singletonList(split));
+ assignSplits(splits);
+ }
+
+ /**
+ * Signals a subtask that it will not receive any further split.
+ *
+ * @param subtask The index of the operator's parallel subtask that shall be signaled it will
+ * not receive any further split.
+ */
+ void signalNoMoreSplits(int subtask);
+
+ /**
+ * Send a source event to a source reader. The source reader is identified by its subtask id.
+ *
+ * @param subtaskId the subtask id of the source reader to send this event to.
+ * @param event the source event to send.
+ */
+ void sendEventToSourceReader(int subtaskId, SourceEvent event);
+ }
+
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportCoordinate.java
similarity index 88%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportCoordinate.java
index 5a84fbf7..2de9a3af 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportCoordinate.java
@@ -17,5 +17,8 @@
package org.apache.seatunnel.api.source;
-public interface Collector {
+/**
+ * Used to mark whether the interface supports coordination.
+ */
+public interface SupportCoordinate {
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
index a48d256c..8abf1398 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
@@ -18,4 +18,9 @@
package org.apache.seatunnel.api.state;
public interface CheckpointListener {
+
+ void notifyCheckpointComplete(long checkpointId) throws Exception;
+
+ default void notifyCheckpointAborted(long checkpointId) throws Exception {
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java
deleted file mode 100644
index fb09a761..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.state;
-
-public interface StatefulOperator {
-}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index 7e6a72a3..a8211f3e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -17,6 +17,7 @@
*/
package org.apache.seatunnel.api.table.catalog;
+
import org.apache.seatunnel.api.table.type.DataType;
import java.util.Objects;
@@ -36,7 +37,9 @@ public abstract class Column {
this.comment = comment;
}
- /** Creates a regular table column that represents physical data. */
+ /**
+ * Creates a regular table column that represents physical data.
+ */
public static PhysicalColumn physical(String name, DataType dataType) {
return new PhysicalColumn(name, dataType);
}
@@ -52,8 +55,10 @@ public abstract class Column {
return new MetadataColumn(name, dataType, metadataKey);
}
- /** Add the comment to the column and return the new object. */
- public abstract Column withComment( String comment);
+ /**
+ * Add the comment to the column and return the new object.
+ */
+ public abstract Column withComment(String comment);
/**
* Returns whether the given column is a physical column of a table; neither computed nor
@@ -61,22 +66,30 @@ public abstract class Column {
*/
public abstract boolean isPhysical();
- /** Returns the data type of this column. */
+ /**
+ * Returns the data type of this column.
+ */
public DataType getDataType() {
return this.dataType;
}
- /** Returns the name of this column. */
+ /**
+ * Returns the name of this column.
+ */
public String getName() {
return name;
}
- /** Returns the comment of this column. */
+ /**
+ * Returns the comment of this column.
+ */
public Optional<String> getComment() {
return Optional.ofNullable(comment);
}
- /** Returns a copy of the column with a replaced {@link DataType}. */
+ /**
+ * Returns a copy of the column with a replaced {@link DataType}.
+ */
public abstract Column copy(DataType newType);
@Override
@@ -102,7 +115,9 @@ public abstract class Column {
// Specific kinds of columns
// --------------------------------------------------------------------------------------------
- /** Representation of a physical column. */
+ /**
+ * Representation of a physical column.
+ */
public static final class PhysicalColumn extends Column {
private PhysicalColumn(String name, DataType dataType) {
@@ -132,7 +147,9 @@ public abstract class Column {
}
}
- /** Representation of a metadata column. */
+ /**
+ * Representation of a metadata column.
+ */
public static final class MetadataColumn extends Column {
private final String metadataKey;
@@ -156,7 +173,7 @@ public abstract class Column {
}
@Override
- public MetadataColumn withComment( String comment) {
+ public MetadataColumn withComment(String comment) {
if (comment == null) {
return this;
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
index da7b02f3..e3c60ada 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
@@ -22,7 +22,7 @@ import java.util.Objects;
public final class TableIdentifier implements Serializable {
private static final long serialVersionUID = 1L;
-
+
private final String catalogName;
private final String databaseName;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index ec47b0ff..ac46fd8a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -32,7 +32,9 @@ public final class TableSchema implements Serializable {
return new TableSchema(columns);
}
- /** Returns all {@link Column}s of this schema. */
+ /**
+ * Returns all {@link Column}s of this schema.
+ */
public List<Column> getColumns() {
return columns;
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java
index 31b526bc..d91e6670 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java
@@ -17,22 +17,28 @@
package org.apache.seatunnel.api.table.catalog.exception;
-/** A catalog-related, runtime exception. */
+/**
+ * A catalog-related, runtime exception.
+ */
public class CatalogException extends RuntimeException {
- /** @param message the detail message. */
+ /**
+ * @param message the detail message.
+ */
public CatalogException(String message) {
super(message);
}
- /** @param cause the cause. */
+ /**
+ * @param cause the cause.
+ */
public CatalogException(Throwable cause) {
super(cause);
}
/**
* @param message the detail message.
- * @param cause the cause.
+ * @param cause the cause.
*/
public CatalogException(String message, Throwable cause) {
super(message, cause);
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
index aef9ba52..3fc90e17 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
@@ -25,7 +25,6 @@ import java.util.Map;
/**
* Used for {@link TableSource} to support metadata columns.
- *
*/
public interface SupportReadingMetadata {
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
index a7a9bf4e..fe0be552 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
@@ -21,5 +21,5 @@ import org.apache.seatunnel.api.sink.Sink;
public interface TableSink {
- Sink<?> createSink();
+ Sink<?, ?, ?, ?> createSink();
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
index a1290f81..2267ab1f 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
@@ -21,5 +21,5 @@ import org.apache.seatunnel.api.source.Source;
public interface TableSource {
- Source<?,?,?> createSource();
+ Source<?, ?, ?> createSource();
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
index 9742f1df..2448c77e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
@@ -21,16 +21,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
import java.util.Map;
-public interface CatalogFactory {
-
- /**
- * Returns a unique identifier among same factory interfaces.
- *
- * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
- */
- String factoryIdentifier();
+public interface CatalogFactory extends Factory {
/**
* Creates a {@link Catalog} using the options.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
similarity index 82%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
index 9742f1df..59b0fd51 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
@@ -17,11 +17,7 @@
package org.apache.seatunnel.api.table.factory;
-import org.apache.seatunnel.api.table.catalog.Catalog;
-
-import java.util.Map;
-
-public interface CatalogFactory {
+public interface Factory {
/**
* Returns a unique identifier among same factory interfaces.
@@ -31,9 +27,4 @@ public interface CatalogFactory {
* using "-" (e.g. {@code elasticsearch-7}).
*/
String factoryIdentifier();
-
- /**
- * Creates a {@link Catalog} using the options.
- */
- Catalog createCatalog(String catalogName, Map<String, String> options);
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryException.java
similarity index 78%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryException.java
index 67fcb835..64e5214c 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryException.java
@@ -17,6 +17,13 @@
package org.apache.seatunnel.api.table.factory;
-public final class TableFactoryUtil {
+public class FactoryException extends RuntimeException {
+ public FactoryException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public FactoryException(String message) {
+ super(message);
+ }
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
new file mode 100644
index 00000000..2b7ba445
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.factory;
+
+import org.apache.seatunnel.api.sink.Sink;
+import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+/**
+ * Use SPI to create {@link TableSourceFactory}, {@link TableSinkFactory} and {@link CatalogFactory}.
+ */
+public final class FactoryUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+ public static List<Source> createAndPrepareSource(
+ List<CatalogTable> multipleTables,
+ Map<String, String> options,
+ ClassLoader classLoader,
+ String factoryIdentifier) {
+
+
+ try {
+ final TableSourceFactory factory = discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier);
+ List<Source> sources = new ArrayList<>(multipleTables.size());
+ if (factory instanceof SupportMultipleTable) {
+ TableFactoryContext context = new TableFactoryContext(multipleTables, options, classLoader);
+ SupportMultipleTable multipleTableSourceFactory = ((SupportMultipleTable) factory);
+ // TODO: create all source
+ SupportMultipleTable.Result result = multipleTableSourceFactory.applyTables(context);
+ TableSource multipleTableSource = factory.createSource(new TableFactoryContext(result.getAcceptedTables(), options, classLoader));
+ // TODO: handle reading metadata
+ Source<?, ?, ?> source = multipleTableSource.createSource();
+ sources.add(source);
+ }
+ return sources;
+ } catch (Throwable t) {
+ throw new FactoryException(
+ String.format(
+ "Unable to create a source for identifier '%s'.", factoryIdentifier),
+ t);
+ }
+ }
+
+ public static List<Sink> createAndPrepareSink() {
+ return null;
+ }
+
+ public static Catalog createCatalog(String catalogName,
+ Map<String, String> options,
+ ClassLoader classLoader,
+ String factoryIdentifier) {
+ CatalogFactory catalogFactory = discoverFactory(classLoader, CatalogFactory.class, factoryIdentifier);
+ return catalogFactory.createCatalog(catalogName, options);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T extends Factory> T discoverFactory(
+ ClassLoader classLoader, Class<T> factoryClass, String factoryIdentifier) {
+ final List<Factory> factories = discoverFactories(classLoader);
+
+ final List<Factory> foundFactories =
+ factories.stream()
+ .filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+ .collect(Collectors.toList());
+
+ if (foundFactories.isEmpty()) {
+ throw new FactoryException(
+ String.format(
+ "Could not find any factories that implement '%s' in the classpath.",
+ factoryClass.getName()));
+ }
+
+ final List<Factory> matchingFactories =
+ foundFactories.stream()
+ .filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+ .collect(Collectors.toList());
+
+ if (matchingFactories.isEmpty()) {
+ throw new FactoryException(
+ String.format(
+ "Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n"
+ + "Available factory identifiers are:\n\n"
+ + "%s",
+ factoryIdentifier,
+ factoryClass.getName(),
+ foundFactories.stream()
+ .map(Factory::factoryIdentifier)
+ .distinct()
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+
+ if (matchingFactories.size() > 1) {
+ throw new FactoryException(
+ String.format(
+ "Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n"
+ + "Ambiguous factory classes are:\n\n"
+ + "%s",
+ factoryIdentifier,
+ factoryClass.getName(),
+ matchingFactories.stream()
+ .map(f -> f.getClass().getName())
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+
+ return (T) matchingFactories.get(0);
+ }
+
+ private static List<Factory> discoverFactories(ClassLoader classLoader) {
+ try {
+ final List<Factory> result = new LinkedList<>();
+ ServiceLoader.load(Factory.class, classLoader)
+ .iterator()
+ .forEachRemaining(result::add);
+ return result;
+ } catch (ServiceConfigurationError e) {
+ LOG.error("Could not load service provider for factories.", e);
+ throw new FactoryException("Could not load service provider for factories.", e);
+ }
+ }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
index 7c34c354..6b15786a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
@@ -30,7 +30,7 @@ public interface SupportMultipleTable {
/**
* A connector can pick tables and return the accepted and remaining tables.
*/
- Result applyTables(TableFactory.Context context);
+ Result applyTables(TableFactoryContext context);
final class Result {
private final List<CatalogTable> acceptedTables;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
similarity index 51%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
index b531e71e..6caf0b89 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
@@ -22,31 +22,38 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import java.util.List;
import java.util.Map;
-public interface TableFactory {
- /**
- * Returns a unique identifier among same factory interfaces.
- *
- * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
- */
- String factoryIdentifier();
+public class TableFactoryContext {
+
+ private final List<CatalogTable> catalogTables;
+ private final Map<String, String> options;
+ private final ClassLoader classLoader;
+
+ public TableFactoryContext(
+ List<CatalogTable> catalogTables,
+ Map<String, String> options,
+ ClassLoader classLoader) {
+ this.catalogTables = catalogTables;
+ this.options = options;
+ this.classLoader = classLoader;
+ }
- /** Provides information describing the multi-table to be accessed. */
- interface Context {
- ClassLoader getClassLoader();
+ public ClassLoader getClassLoader() {
+ return this.classLoader;
+ }
- /**
- * Returns a list of tables that need to be processed.
- *
- * <p> By default, return only single table.
- *
- * <p> If you need multiple tables, implement {@link SupportMultipleTable}.
- */
- List<CatalogTable> getCatalogTable();
+ /**
+ * Returns a list of tables that need to be processed.
+ *
+ * <p> By default, return only single table.
+ *
+ * <p> If you need multiple tables, implement {@link SupportMultipleTable}.
+ */
+ public List<CatalogTable> getCatalogTables() {
+ return catalogTables;
+ }
- /** Gives read-only access to the configuration of the current session. */
- Map<String, String> getOptions();
+ public Map<String, String> getOptions() {
+ return this.options;
}
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index 33319659..bb92a76a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.api.table.factory;
import org.apache.seatunnel.api.table.connector.TableSink;
-public interface TableSinkFactory extends TableFactory {
+public interface TableSinkFactory extends Factory {
- TableSink createSink(TableFactory.Context context);
+ TableSink createSink(TableFactoryContext context);
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 8318ab3a..241deeb0 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.api.table.factory;
import org.apache.seatunnel.api.table.connector.TableSource;
-public interface TableSourceFactory extends TableFactory {
+public interface TableSourceFactory extends Factory {
- TableSource createSource(TableFactory.Context context);
+ TableSource createSource(TableFactoryContext context);
}
diff --git a/seatunnel-translation/pom.xml b/seatunnel-translation/pom.xml
new file mode 100644
index 00000000..3a44957b
--- /dev/null
+++ b/seatunnel-translation/pom.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>2.0.5-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-translation</artifactId>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>seatunnel-translation-base</module>
+ <module>seatunnel-translation-flink</module>
+ <module>seatunnel-translation-spark</module>
+ </modules>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-translation/seatunnel-translation-base/pom.xml b/seatunnel-translation/seatunnel-translation-base/pom.xml
new file mode 100644
index 00000000..f9bbaff4
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-base/pom.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>2.0.5-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-translation-base</artifactId>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
similarity index 55%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
index 9742f1df..06bd52e9 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
@@ -15,25 +15,29 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.translation.serialization;
-import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.type.Row;
-import java.util.Map;
+import java.io.IOException;
-public interface CatalogFactory {
+public interface RowSerialization<T> {
/**
- * Returns a unique identifier among same factory interfaces.
+ * Serializes the given object.
*
- * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
+ * @param seaTunnelRow The object to serialize.
+ * @return The serialized data (bytes).
+ * @throws IOException Thrown, if the serialization fails.
*/
- String factoryIdentifier();
+ T serialize(Row seaTunnelRow) throws IOException;
/**
- * Creates a {@link Catalog} using the options.
+ * De-serializes the given data (bytes).
+ *
+ * @param engineRow The internal engine row
+ * @return The SeaTunnel Row
+ * @throws IOException Thrown, if the deserialization fails.
*/
- Catalog createCatalog(String catalogName, Map<String, String> options);
+ Row deserialize(T engineRow) throws IOException;
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
similarity index 90%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 563d9fa0..2816df97 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -15,7 +15,8 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.source;
+
+public class CoordinatedSource {
-public interface SinkWriter {
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
similarity index 91%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 563d9fa0..975fbfa7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.source;
-public interface SinkWriter {
+public class ParallelSource {
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-translation/seatunnel-translation-flink/pom.xml
new file mode 100644
index 00000000..7c8c3900
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>2.0.5-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-translation-flink</artifactId>
+
+ <properties>
+ <flink.version>1.13.6</flink.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-translation-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!-- apache flink table -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
new file mode 100644
index 00000000..b45d100d
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
@@ -0,0 +1,19 @@
+package org.apache.seatunnel.translation.flink.serialization;
+
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.translation.serialization.RowSerialization;
+
+import java.io.IOException;
+
+public class FlinkRowSerialization implements RowSerialization<Row> {
+
+ @Override
+ public Row serialize(org.apache.seatunnel.api.table.type.Row seaTunnelRow) throws IOException {
+ return null;
+ }
+
+ @Override
+ public org.apache.seatunnel.api.table.type.Row deserialize(Row engineRow) throws IOException {
+ return null;
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/pom.xml b/seatunnel-translation/seatunnel-translation-spark/pom.xml
new file mode 100644
index 00000000..c381244a
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/pom.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>2.0.5-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-translation-spark</artifactId>
+
+ <properties>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-translation-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file