You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/28 06:40:31 UTC

[incubator-seatunnel] 02/02: [Feature][core] base logic

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git

commit 5eb77fc3050bf6306f25843aba6a2c3020e014fa
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Thu Apr 28 10:29:44 2022 +0800

    [Feature][core] base logic
---
 pom.xml                                            |   1 +
 .../Serializer.java}                               |  26 ++--
 .../java/org/apache/seatunnel/api/sink/Sink.java   |  32 ++++-
 .../api/sink/SinkAggregatedCommitter.java          |  10 +-
 .../apache/seatunnel/api/sink/SinkCommitter.java   |  11 +-
 .../org/apache/seatunnel/api/sink/SinkWriter.java  |  44 +++++-
 .../org/apache/seatunnel/api/source/Collector.java |   5 +-
 .../org/apache/seatunnel/api/source/Source.java    |  18 ++-
 .../apache/seatunnel/api/source/SourceEvent.java   |   7 +-
 .../apache/seatunnel/api/source/SourceReader.java  |  41 +++++-
 .../apache/seatunnel/api/source/SourceSplit.java   |   4 +-
 .../api/source/SourceSplitEnumerator.java          |  68 +++++++++-
 .../{Collector.java => SupportCoordinate.java}     |   5 +-
 .../seatunnel/api/state/CheckpointListener.java    |   5 +
 .../seatunnel/api/state/StatefulOperator.java      |  21 ---
 .../apache/seatunnel/api/table/catalog/Column.java |  37 +++--
 .../api/table/catalog/TableIdentifier.java         |   2 +-
 .../seatunnel/api/table/catalog/TableSchema.java   |   4 +-
 .../table/catalog/exception/CatalogException.java  |  14 +-
 .../table/connector/SupportReadingMetadata.java    |   1 -
 .../seatunnel/api/table/connector/TableSink.java   |   2 +-
 .../seatunnel/api/table/connector/TableSource.java |   2 +-
 .../api/table/factory/CatalogFactory.java          |  11 +-
 .../factory/{CatalogFactory.java => Factory.java}  |  11 +-
 ...TableFactoryUtil.java => FactoryException.java} |   9 +-
 .../seatunnel/api/table/factory/FactoryUtil.java   | 150 +++++++++++++++++++++
 .../api/table/factory/SupportMultipleTable.java    |   2 +-
 ...{TableFactory.java => TableFactoryContext.java} |  51 ++++---
 .../api/table/factory/TableSinkFactory.java        |   4 +-
 .../api/table/factory/TableSourceFactory.java      |   4 +-
 seatunnel-translation/pom.xml                      |  21 +++
 .../seatunnel-translation-base/pom.xml             |  26 ++++
 .../serialization/RowSerialization.java            |  26 ++--
 .../translation/source/CoordinatedSource.java      |   5 +-
 .../translation/source/ParallelSource.java         |   4 +-
 .../seatunnel-translation-flink/pom.xml            |  32 +++++
 .../flink/serialization/FlinkRowSerialization.java |  19 +++
 .../seatunnel-translation-spark/pom.xml            |  24 ++++
 38 files changed, 630 insertions(+), 129 deletions(-)

diff --git a/pom.xml b/pom.xml
index 5804f998..ed98a286 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,7 @@
         <module>seatunnel-examples</module>
         <module>seatunnel-e2e</module>
         <module>seatunnel-api</module>
+        <module>seatunnel-translation</module>
     </modules>
 
     <properties>
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
similarity index 57%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
index 9742f1df..61703486 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
@@ -15,25 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.api.serialization;
 
-import org.apache.seatunnel.api.table.catalog.Catalog;
+import java.io.IOException;
 
-import java.util.Map;
-
-public interface CatalogFactory {
+public interface Serializer<T> {
 
     /**
-     * Returns a unique identifier among same factory interfaces.
+     * Serializes the given object.
      *
-     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
+     * @param obj The object to serialize.
+     * @return The serialized data (bytes).
+     * @throws IOException Thrown, if the serialization fails.
      */
-    String factoryIdentifier();
+    byte[] serialize(T obj) throws IOException;
 
     /**
-     * Creates a {@link Catalog} using the options.
+     * De-serializes the given data (bytes).
+     *
+     * @param serialized The serialized data
+     * @return The deserialized object
+     * @throws IOException Thrown, if the deserialization fails.
      */
-    Catalog createCatalog(String catalogName, Map<String, String> options);
+    T deserialize(byte[] serialized) throws IOException;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
index 9e1d73e6..2c1fedef 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
@@ -17,8 +17,38 @@
 
 package org.apache.seatunnel.api.sink;
 
+import org.apache.seatunnel.api.serialization.Serializer;
+
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+public interface Sink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable {
+
+    SinkWriter<IN, StateT> createWriter(SinkWriter.Context context) throws IOException;
+
+    default SinkWriter<IN, StateT> restoreWriter(SinkWriter.Context context, List<StateT> states) throws IOException {
+        return createWriter(context);
+    }
+
+    default Optional<Serializer<StateT>> getWriterStateSerializer() {
+        return Optional.empty();
+    }
+
+    default Optional<SinkCommitter<CommitInfoT>> createCommitter() throws IOException {
+        return Optional.empty();
+    }
+
+    default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() {
+        return Optional.empty();
+    }
 
-public interface Sink<IN> extends Serializable {
+    default Optional<SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>> createAggregatedCommitter() throws IOException {
+        return Optional.empty();
+    }
 
+    default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() {
+        return Optional.empty();
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index e4947fcb..1f38efa3 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -17,5 +17,13 @@
 
 package org.apache.seatunnel.api.sink;
 
-public interface SinkAggregatedCommitter {
+import java.io.IOException;
+import java.util.List;
+
+public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+
+    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
+            throws IOException, InterruptedException;
+
+    void abort() throws Exception;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
index 176cca65..2dff2525 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -17,5 +17,14 @@
 
 package org.apache.seatunnel.api.sink;
 
-public interface SinkCommitter {
+import java.io.IOException;
+import java.util.List;
+
+public interface SinkCommitter<CommitInfoT> {
+
+    List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
+
+    List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+
+    void abort() throws Exception;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 563d9fa0..933bfeb0 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -17,5 +17,47 @@
 
 package org.apache.seatunnel.api.sink;
 
-public interface SinkWriter {
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface SinkWriter<T, StateT> {
+    void write(T element) throws IOException, InterruptedException;
+
+    /**
+     * @return The writer's state.
+     * @throws IOException if fail to snapshot writer's state.
+     * @deprecated implement {@link #snapshotState(long)}
+     */
+    default List<StateT> snapshotState() throws IOException {
+        return Collections.emptyList();
+    }
+
+    /**
+     * @return The writer's state.
+     * @throws IOException if fail to snapshot writer's state.
+     */
+    default List<StateT> snapshotState(long checkpointId) throws IOException {
+        return snapshotState();
+    }
+
+    interface Context {
+
+        /**
+         * Gets the configuration with which Flink was started.
+         */
+        Map<String, String> getConfiguration();
+
+        /**
+         * @return The index of this subtask.
+         */
+        int getIndexOfSubtask();
+
+        /**
+         * @return The number of parallel Sink tasks.
+         */
+        int getNumberOfParallelSubtasks();
+
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
index 5a84fbf7..f36a7f87 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
@@ -17,5 +17,8 @@
 
 package org.apache.seatunnel.api.source;
 
-public interface Collector {
+public interface Collector<T> {
+
+    void collect(T record);
+
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
index 6dd2e74a..80cf9aaf 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
@@ -17,17 +17,31 @@
 
 package org.apache.seatunnel.api.source;
 
+import org.apache.seatunnel.api.serialization.Serializer;
+
 import java.io.Serializable;
 
 /**
  * The interface for Source. It acts like a factory class that helps construct the {@link
  * SourceSplitEnumerator} and {@link SourceReader} and corresponding serializers.
  *
- * @param <T> The type of records produced by the source.
+ * @param <T>      The type of records produced by the source.
  * @param <SplitT> The type of splits handled by the source.
- * @param <StateT> The type of state to store.
  */
 public interface Source<T, SplitT extends SourceSplit, StateT> extends Serializable {
 
+    /**
+     * Get the boundedness of this source.
+     *
+     * @return the boundedness of this source.
+     */
+    Boundedness getBoundedness();
+
+    SourceReader<T, SplitT> createReader(SourceReader.Context readerContext) throws Exception;
+
+    SourceSplitEnumerator<SplitT, StateT> createEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext) throws Exception;
+
+    SourceSplitEnumerator<SplitT, StateT> restoreEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext, StateT checkpointState) throws Exception;
 
+    Serializer<StateT> getEnumeratorStateSerializer();
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
index e1255b95..4d2374b4 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
@@ -17,5 +17,10 @@
 
 package org.apache.seatunnel.api.source;
 
-public interface SourceEvent {
+import java.io.Serializable;
+
+/**
+ * An base class for the events passed between the SourceReaders and Enumerators.
+ */
+public interface SourceEvent extends Serializable {
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
index 7e784f17..dff0527d 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
@@ -17,9 +17,46 @@
 
 package org.apache.seatunnel.api.source;
 
-public interface SourceReader {
+import org.apache.seatunnel.api.state.CheckpointListener;
 
-    interface SupportCoordinate {
+import java.util.List;
+import java.util.Map;
 
+public interface SourceReader<T, SplitT extends SourceSplit> extends CheckpointListener {
+
+    void start(Collector<T> output) throws Exception;
+
+    List<SplitT> snapshotState(long checkpointId);
+
+    void addSplits(List<SplitT> splits);
+
+    default void handleSourceEvent(SourceEvent sourceEvent) {
+    }
+
+    interface Context {
+
+        /**
+         * Gets the configuration with which Flink was started.
+         */
+        Map<String, String> getConfiguration();
+
+        /**
+         * @return The index of this subtask.
+         */
+        int getIndexOfSubtask();
+
+        /**
+         * Sends a split request to the source's {@link SourceSplitEnumerator}. This will result in a call to
+         * the {@link SourceSplitEnumerator#handleSplitRequest(int, String)} method, with this reader's
+         * parallel subtask id and the hostname where this reader runs.
+         */
+        void sendSplitRequest();
+
+        /**
+         * Send a source event to the source coordinator.
+         *
+         * @param sourceEvent the source event to coordinator.
+         */
+        void sendSourceEventToCoordinator(SourceEvent sourceEvent);
     }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java
index e7b6b9b1..af6e1acf 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.api.source;
 
-/** An interface for all the Split types to extend. */
+/**
+ * An interface for all the Split types to extend.
+ */
 public interface SourceSplit {
 
     /**
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
index 0dff0b1d..1c2291fa 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -17,9 +17,73 @@
 
 package org.apache.seatunnel.api.source;
 
-public interface SourceSplitEnumerator {
+import org.apache.seatunnel.api.state.CheckpointListener;
 
-    interface SupportCoordinate {
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> extends CheckpointListener {
+
+    void handleSplitRequest(int subtaskId, String requesterHostname);
+
+    void registerReader(int subtaskId);
+
+    StateT snapshotState(long checkpointId) throws Exception;
+
+    default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
     }
+
+    interface Context<SplitT extends SourceSplit> {
+
+        int currentParallelism();
+
+        /**
+         * Get the currently registered readers. The mapping is from subtask id to the reader info.
+         *
+         * @return the currently registered readers.
+         */
+        Set<Integer> registeredReaders();
+
+        /**
+         * Assign the splits.
+         *
+         * @param newSplitAssignments the new split assignments to add.
+         */
+        void assignSplits(Map<Integer, List<SplitT>> newSplitAssignments);
+
+        /**
+         * Assigns a single split.
+         *
+         * <p>When assigning multiple splits, it is more efficient to assign all of them in a single
+         * call to the {@link #assignSplits} method.
+         *
+         * @param split   The new split
+         * @param subtask The index of the operator's parallel subtask that shall receive the split.
+         */
+        default void assignSplit(SplitT split, int subtask) {
+            Map<Integer, List<SplitT>> splits = new HashMap<>();
+            splits.put(subtask, Collections.singletonList(split));
+            assignSplits(splits);
+        }
+
+        /**
+         * Signals a subtask that it will not receive any further split.
+         *
+         * @param subtask The index of the operator's parallel subtask that shall be signaled it will
+         *                not receive any further split.
+         */
+        void signalNoMoreSplits(int subtask);
+
+        /**
+         * Send a source event to a source reader. The source reader is identified by its subtask id.
+         *
+         * @param subtaskId the subtask id of the source reader to send this event to.
+         * @param event     the source event to send.
+         */
+        void sendEventToSourceReader(int subtaskId, SourceEvent event);
+    }
+
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportCoordinate.java
similarity index 88%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportCoordinate.java
index 5a84fbf7..2de9a3af 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportCoordinate.java
@@ -17,5 +17,8 @@
 
 package org.apache.seatunnel.api.source;
 
-public interface Collector {
+/**
+ * Used to mark whether the interface supports coordination.
+ */
+public interface SupportCoordinate {
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
index a48d256c..8abf1398 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
@@ -18,4 +18,9 @@
 package org.apache.seatunnel.api.state;
 
 public interface CheckpointListener {
+
+    void notifyCheckpointComplete(long checkpointId) throws Exception;
+
+    default void notifyCheckpointAborted(long checkpointId) throws Exception {
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java
deleted file mode 100644
index fb09a761..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.state;
-
-public interface StatefulOperator {
-}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index 7e6a72a3..a8211f3e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -17,6 +17,7 @@
  */
 
 package org.apache.seatunnel.api.table.catalog;
+
 import org.apache.seatunnel.api.table.type.DataType;
 
 import java.util.Objects;
@@ -36,7 +37,9 @@ public abstract class Column {
         this.comment = comment;
     }
 
-    /** Creates a regular table column that represents physical data. */
+    /**
+     * Creates a regular table column that represents physical data.
+     */
     public static PhysicalColumn physical(String name, DataType dataType) {
         return new PhysicalColumn(name, dataType);
     }
@@ -52,8 +55,10 @@ public abstract class Column {
         return new MetadataColumn(name, dataType, metadataKey);
     }
 
-    /** Add the comment to the column and return the new object. */
-    public abstract Column withComment( String comment);
+    /**
+     * Add the comment to the column and return the new object.
+     */
+    public abstract Column withComment(String comment);
 
     /**
      * Returns whether the given column is a physical column of a table; neither computed nor
@@ -61,22 +66,30 @@ public abstract class Column {
      */
     public abstract boolean isPhysical();
 
-    /** Returns the data type of this column. */
+    /**
+     * Returns the data type of this column.
+     */
     public DataType getDataType() {
         return this.dataType;
     }
 
-    /** Returns the name of this column. */
+    /**
+     * Returns the name of this column.
+     */
     public String getName() {
         return name;
     }
 
-    /** Returns the comment of this column. */
+    /**
+     * Returns the comment of this column.
+     */
     public Optional<String> getComment() {
         return Optional.ofNullable(comment);
     }
 
-    /** Returns a copy of the column with a replaced {@link DataType}. */
+    /**
+     * Returns a copy of the column with a replaced {@link DataType}.
+     */
     public abstract Column copy(DataType newType);
 
     @Override
@@ -102,7 +115,9 @@ public abstract class Column {
     // Specific kinds of columns
     // --------------------------------------------------------------------------------------------
 
-    /** Representation of a physical column. */
+    /**
+     * Representation of a physical column.
+     */
     public static final class PhysicalColumn extends Column {
 
         private PhysicalColumn(String name, DataType dataType) {
@@ -132,7 +147,9 @@ public abstract class Column {
         }
     }
 
-    /** Representation of a metadata column. */
+    /**
+     * Representation of a metadata column.
+     */
     public static final class MetadataColumn extends Column {
 
         private final String metadataKey;
@@ -156,7 +173,7 @@ public abstract class Column {
         }
 
         @Override
-        public MetadataColumn withComment( String comment) {
+        public MetadataColumn withComment(String comment) {
             if (comment == null) {
                 return this;
             }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
index da7b02f3..e3c60ada 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
@@ -22,7 +22,7 @@ import java.util.Objects;
 
 public final class TableIdentifier implements Serializable {
     private static final long serialVersionUID = 1L;
-    
+
     private final String catalogName;
 
     private final String databaseName;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index ec47b0ff..ac46fd8a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -32,7 +32,9 @@ public final class TableSchema implements Serializable {
         return new TableSchema(columns);
     }
 
-    /** Returns all {@link Column}s of this schema. */
+    /**
+     * Returns all {@link Column}s of this schema.
+     */
     public List<Column> getColumns() {
         return columns;
     }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java
index 31b526bc..d91e6670 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java
@@ -17,22 +17,28 @@
 
 package org.apache.seatunnel.api.table.catalog.exception;
 
-/** A catalog-related, runtime exception. */
+/**
+ * A catalog-related, runtime exception.
+ */
 public class CatalogException extends RuntimeException {
 
-    /** @param message the detail message. */
+    /**
+     * @param message the detail message.
+     */
     public CatalogException(String message) {
         super(message);
     }
 
-    /** @param cause the cause. */
+    /**
+     * @param cause the cause.
+     */
     public CatalogException(Throwable cause) {
         super(cause);
     }
 
     /**
      * @param message the detail message.
-     * @param cause the cause.
+     * @param cause   the cause.
      */
     public CatalogException(String message, Throwable cause) {
         super(message, cause);
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
index aef9ba52..3fc90e17 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
@@ -25,7 +25,6 @@ import java.util.Map;
 
 /**
  * Used for {@link TableSource} to support metadata columns.
- *
  */
 public interface SupportReadingMetadata {
 
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
index a7a9bf4e..fe0be552 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
@@ -21,5 +21,5 @@ import org.apache.seatunnel.api.sink.Sink;
 
 public interface TableSink {
 
-    Sink<?> createSink();
+    Sink<?, ?, ?, ?> createSink();
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
index a1290f81..2267ab1f 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
@@ -21,5 +21,5 @@ import org.apache.seatunnel.api.source.Source;
 
 public interface TableSource {
 
-    Source<?,?,?> createSource();
+    Source<?, ?, ?> createSource();
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
index 9742f1df..2448c77e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
@@ -21,16 +21,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
 
 import java.util.Map;
 
-public interface CatalogFactory {
-
-    /**
-     * Returns a unique identifier among same factory interfaces.
-     *
-     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
-     */
-    String factoryIdentifier();
+public interface CatalogFactory extends Factory {
 
     /**
      * Creates a {@link Catalog} using the options.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
similarity index 82%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
index 9742f1df..59b0fd51 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
@@ -17,11 +17,7 @@
 
 package org.apache.seatunnel.api.table.factory;
 
-import org.apache.seatunnel.api.table.catalog.Catalog;
-
-import java.util.Map;
-
-public interface CatalogFactory {
+public interface Factory {
 
     /**
      * Returns a unique identifier among same factory interfaces.
@@ -31,9 +27,4 @@ public interface CatalogFactory {
      * using "-" (e.g. {@code elasticsearch-7}).
      */
     String factoryIdentifier();
-
-    /**
-     * Creates a {@link Catalog} using the options.
-     */
-    Catalog createCatalog(String catalogName, Map<String, String> options);
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryException.java
similarity index 78%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryException.java
index 67fcb835..64e5214c 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryException.java
@@ -17,6 +17,13 @@
 
 package org.apache.seatunnel.api.table.factory;
 
-public final class TableFactoryUtil {
+public class FactoryException extends RuntimeException {
 
+    public FactoryException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public FactoryException(String message) {
+        super(message);
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
new file mode 100644
index 00000000..2b7ba445
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.factory;
+
+import org.apache.seatunnel.api.sink.Sink;
+import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+/**
+ * Use SPI to create {@link TableSourceFactory}, {@link TableSinkFactory} and {@link CatalogFactory}.
+ */
+public final class FactoryUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+    public static List<Source> createAndPrepareSource(
+            List<CatalogTable> multipleTables,
+            Map<String, String> options,
+            ClassLoader classLoader,
+            String factoryIdentifier) {
+
+
+        try {
+            final TableSourceFactory factory = discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier);
+            List<Source> sources = new ArrayList<>(multipleTables.size());
+            if (factory instanceof SupportMultipleTable) {
+                TableFactoryContext context = new TableFactoryContext(multipleTables, options, classLoader);
+                SupportMultipleTable multipleTableSourceFactory = ((SupportMultipleTable) factory);
+                // TODO: create all source
+                SupportMultipleTable.Result result = multipleTableSourceFactory.applyTables(context);
+                TableSource multipleTableSource = factory.createSource(new TableFactoryContext(result.getAcceptedTables(), options, classLoader));
+                // TODO: handle reading metadata
+                Source<?, ?, ?> source = multipleTableSource.createSource();
+                sources.add(source);
+            }
+            return sources;
+        } catch (Throwable t) {
+            throw new FactoryException(
+                    String.format(
+                            "Unable to create a source for identifier '%s'.", factoryIdentifier),
+                    t);
+        }
+    }
+
+    public static List<Sink> createAndPrepareSink() {
+        return null;
+    }
+
+    public static Catalog createCatalog(String catalogName,
+                                        Map<String, String> options,
+                                        ClassLoader classLoader,
+                                        String factoryIdentifier) {
+        CatalogFactory catalogFactory = discoverFactory(classLoader, CatalogFactory.class, factoryIdentifier);
+        return catalogFactory.createCatalog(catalogName, options);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T extends Factory> T discoverFactory(
+            ClassLoader classLoader, Class<T> factoryClass, String factoryIdentifier) {
+        final List<Factory> factories = discoverFactories(classLoader);
+
+        final List<Factory> foundFactories =
+                factories.stream()
+                        .filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+                        .collect(Collectors.toList());
+
+        if (foundFactories.isEmpty()) {
+            throw new FactoryException(
+                    String.format(
+                            "Could not find any factories that implement '%s' in the classpath.",
+                            factoryClass.getName()));
+        }
+
+        final List<Factory> matchingFactories =
+                foundFactories.stream()
+                        .filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+                        .collect(Collectors.toList());
+
+        if (matchingFactories.isEmpty()) {
+            throw new FactoryException(
+                    String.format(
+                            "Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n"
+                                    + "Available factory identifiers are:\n\n"
+                                    + "%s",
+                            factoryIdentifier,
+                            factoryClass.getName(),
+                            foundFactories.stream()
+                                    .map(Factory::factoryIdentifier)
+                                    .distinct()
+                                    .sorted()
+                                    .collect(Collectors.joining("\n"))));
+        }
+
+        if (matchingFactories.size() > 1) {
+            throw new FactoryException(
+                    String.format(
+                            "Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n"
+                                    + "Ambiguous factory classes are:\n\n"
+                                    + "%s",
+                            factoryIdentifier,
+                            factoryClass.getName(),
+                            matchingFactories.stream()
+                                    .map(f -> f.getClass().getName())
+                                    .sorted()
+                                    .collect(Collectors.joining("\n"))));
+        }
+
+        return (T) matchingFactories.get(0);
+    }
+
+    private static List<Factory> discoverFactories(ClassLoader classLoader) {
+        try {
+            final List<Factory> result = new LinkedList<>();
+            ServiceLoader.load(Factory.class, classLoader)
+                    .iterator()
+                    .forEachRemaining(result::add);
+            return result;
+        } catch (ServiceConfigurationError e) {
+            LOG.error("Could not load service provider for factories.", e);
+            throw new FactoryException("Could not load service provider for factories.", e);
+        }
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
index 7c34c354..6b15786a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
@@ -30,7 +30,7 @@ public interface SupportMultipleTable {
     /**
      * A connector can pick tables and return the accepted and remaining tables.
      */
-    Result applyTables(TableFactory.Context context);
+    Result applyTables(TableFactoryContext context);
 
     final class Result {
         private final List<CatalogTable> acceptedTables;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
similarity index 51%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
index b531e71e..6caf0b89 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
@@ -22,31 +22,38 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import java.util.List;
 import java.util.Map;
 
-public interface TableFactory {
-    /**
-     * Returns a unique identifier among same factory interfaces.
-     *
-     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
-     */
-    String factoryIdentifier();
+public class TableFactoryContext {
+
+    private final List<CatalogTable> catalogTables;
+    private final Map<String, String> options;
+    private final ClassLoader classLoader;
+
+    public TableFactoryContext(
+            List<CatalogTable> catalogTables,
+            Map<String, String> options,
+            ClassLoader classLoader) {
+        this.catalogTables = catalogTables;
+        this.options = options;
+        this.classLoader = classLoader;
+    }
 
-    /** Provides information describing the multi-table to be accessed. */
-    interface Context {
 
-        ClassLoader getClassLoader();
+    public ClassLoader getClassLoader() {
+        return this.classLoader;
+    }
 
-        /**
-         * Returns a list of tables that need to be processed.
-         *
-         * <p> By default, return only single table.
-         *
-         * <p> If you need multiple tables, implement {@link SupportMultipleTable}.
-         */
-        List<CatalogTable> getCatalogTable();
+    /**
+     * Returns a list of tables that need to be processed.
+     *
+     * <p> By default, return only single table.
+     *
+     * <p> If you need multiple tables, implement {@link SupportMultipleTable}.
+     */
+    public List<CatalogTable> getCatalogTables() {
+        return catalogTables;
+    }
 
-        /** Gives read-only access to the configuration of the current session. */
-        Map<String, String> getOptions();
+    public Map<String, String> getOptions() {
+        return this.options;
     }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index 33319659..bb92a76a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.api.table.factory;
 
 import org.apache.seatunnel.api.table.connector.TableSink;
 
-public interface TableSinkFactory extends TableFactory {
+public interface TableSinkFactory extends Factory {
 
-    TableSink createSink(TableFactory.Context context);
+    TableSink createSink(TableFactoryContext context);
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 8318ab3a..241deeb0 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.api.table.factory;
 
 import org.apache.seatunnel.api.table.connector.TableSource;
 
-public interface TableSourceFactory extends TableFactory {
+public interface TableSourceFactory extends Factory {
 
-    TableSource createSource(TableFactory.Context context);
+    TableSource createSource(TableFactoryContext context);
 }
diff --git a/seatunnel-translation/pom.xml b/seatunnel-translation/pom.xml
new file mode 100644
index 00000000..3a44957b
--- /dev/null
+++ b/seatunnel-translation/pom.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.0.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-translation</artifactId>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>seatunnel-translation-base</module>
+        <module>seatunnel-translation-flink</module>
+        <module>seatunnel-translation-spark</module>
+    </modules>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-translation/seatunnel-translation-base/pom.xml b/seatunnel-translation/seatunnel-translation-base/pom.xml
new file mode 100644
index 00000000..f9bbaff4
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-base/pom.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.0.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-translation-base</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
similarity index 55%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
index 9742f1df..06bd52e9 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
@@ -15,25 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.translation.serialization;
 
-import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.type.Row;
 
-import java.util.Map;
+import java.io.IOException;
 
-public interface CatalogFactory {
+public interface RowSerialization<T> {
 
     /**
-     * Returns a unique identifier among same factory interfaces.
+     * Serializes the given object.
      *
-     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
+     * @param seaTunnelRow The object to serialize.
+     * @return The serialized data (bytes).
+     * @throws IOException Thrown, if the serialization fails.
      */
-    String factoryIdentifier();
+    T serialize(Row seaTunnelRow) throws IOException;
 
     /**
-     * Creates a {@link Catalog} using the options.
+     * De-serializes the given data (bytes).
+     *
+     * @param engineRow The internal engine row
+     * @return The SeaTunnel Row
+     * @throws IOException Thrown, if the deserialization fails.
      */
-    Catalog createCatalog(String catalogName, Map<String, String> options);
+    Row deserialize(T engineRow) throws IOException;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
similarity index 90%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 563d9fa0..2816df97 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -15,7 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.source;
+
+public class CoordinatedSource {
 
-public interface SinkWriter {
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
similarity index 91%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 563d9fa0..975fbfa7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.source;
 
-public interface SinkWriter {
+public class ParallelSource {
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-translation/seatunnel-translation-flink/pom.xml
new file mode 100644
index 00000000..7c8c3900
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.0.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-translation-flink</artifactId>
+
+    <properties>
+        <flink.version>1.13.6</flink.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-translation-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <!-- apache flink table -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
new file mode 100644
index 00000000..b45d100d
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
@@ -0,0 +1,19 @@
+package org.apache.seatunnel.translation.flink.serialization;
+
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.translation.serialization.RowSerialization;
+
+import java.io.IOException;
+
+public class FlinkRowSerialization implements RowSerialization<Row> {
+
+    @Override
+    public Row serialize(org.apache.seatunnel.api.table.type.Row seaTunnelRow) throws IOException {
+        return null;
+    }
+
+    @Override
+    public org.apache.seatunnel.api.table.type.Row deserialize(Row engineRow) throws IOException {
+        return null;
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/pom.xml b/seatunnel-translation/seatunnel-translation-spark/pom.xml
new file mode 100644
index 00000000..c381244a
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/pom.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.0.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-translation-spark</artifactId>
+
+    <properties>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-translation-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file