You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/28 06:40:29 UTC

[incubator-seatunnel] branch api-draft created (now 5eb77fc3)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a change to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


      at 5eb77fc3 [Feature][core] base logic

This branch includes the following new commits:

     new 32592237 [Feature][core] base interface.
     new 5eb77fc3 [Feature][core] base logic

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-seatunnel] 02/02: [Feature][core] base logic

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git

commit 5eb77fc3050bf6306f25843aba6a2c3020e014fa
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Thu Apr 28 10:29:44 2022 +0800

    [Feature][core] base logic
---
 pom.xml                                            |   1 +
 .../Serializer.java}                               |  26 ++--
 .../java/org/apache/seatunnel/api/sink/Sink.java   |  32 ++++-
 .../api/sink/SinkAggregatedCommitter.java          |  10 +-
 .../apache/seatunnel/api/sink/SinkCommitter.java   |  11 +-
 .../org/apache/seatunnel/api/sink/SinkWriter.java  |  44 +++++-
 .../org/apache/seatunnel/api/source/Collector.java |   5 +-
 .../org/apache/seatunnel/api/source/Source.java    |  18 ++-
 .../apache/seatunnel/api/source/SourceEvent.java   |   7 +-
 .../apache/seatunnel/api/source/SourceReader.java  |  41 +++++-
 .../apache/seatunnel/api/source/SourceSplit.java   |   4 +-
 .../api/source/SourceSplitEnumerator.java          |  68 +++++++++-
 .../{Collector.java => SupportCoordinate.java}     |   5 +-
 .../seatunnel/api/state/CheckpointListener.java    |   5 +
 .../seatunnel/api/state/StatefulOperator.java      |  21 ---
 .../apache/seatunnel/api/table/catalog/Column.java |  37 +++--
 .../api/table/catalog/TableIdentifier.java         |   2 +-
 .../seatunnel/api/table/catalog/TableSchema.java   |   4 +-
 .../table/catalog/exception/CatalogException.java  |  14 +-
 .../table/connector/SupportReadingMetadata.java    |   1 -
 .../seatunnel/api/table/connector/TableSink.java   |   2 +-
 .../seatunnel/api/table/connector/TableSource.java |   2 +-
 .../api/table/factory/CatalogFactory.java          |  11 +-
 .../factory/{CatalogFactory.java => Factory.java}  |  11 +-
 ...TableFactoryUtil.java => FactoryException.java} |   9 +-
 .../seatunnel/api/table/factory/FactoryUtil.java   | 150 +++++++++++++++++++++
 .../api/table/factory/SupportMultipleTable.java    |   2 +-
 ...{TableFactory.java => TableFactoryContext.java} |  51 ++++---
 .../api/table/factory/TableSinkFactory.java        |   4 +-
 .../api/table/factory/TableSourceFactory.java      |   4 +-
 seatunnel-translation/pom.xml                      |  21 +++
 .../seatunnel-translation-base/pom.xml             |  26 ++++
 .../serialization/RowSerialization.java            |  26 ++--
 .../translation/source/CoordinatedSource.java      |   5 +-
 .../translation/source/ParallelSource.java         |   4 +-
 .../seatunnel-translation-flink/pom.xml            |  32 +++++
 .../flink/serialization/FlinkRowSerialization.java |  19 +++
 .../seatunnel-translation-spark/pom.xml            |  24 ++++
 38 files changed, 630 insertions(+), 129 deletions(-)

diff --git a/pom.xml b/pom.xml
index 5804f998..ed98a286 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,7 @@
         <module>seatunnel-examples</module>
         <module>seatunnel-e2e</module>
         <module>seatunnel-api</module>
+        <module>seatunnel-translation</module>
     </modules>
 
     <properties>
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
similarity index 57%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
index 9742f1df..61703486 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java
@@ -15,25 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.api.serialization;
 
-import org.apache.seatunnel.api.table.catalog.Catalog;
+import java.io.IOException;
 
-import java.util.Map;
-
-public interface CatalogFactory {
+public interface Serializer<T> {
 
     /**
-     * Returns a unique identifier among same factory interfaces.
+     * Serializes the given object.
      *
-     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
+     * @param obj The object to serialize.
+     * @return The serialized data (bytes).
+     * @throws IOException Thrown, if the serialization fails.
      */
-    String factoryIdentifier();
+    byte[] serialize(T obj) throws IOException;
 
     /**
-     * Creates a {@link Catalog} using the options.
+     * De-serializes the given data (bytes).
+     *
+     * @param serialized The serialized data
+     * @return The deserialized object
+     * @throws IOException Thrown, if the deserialization fails.
      */
-    Catalog createCatalog(String catalogName, Map<String, String> options);
+    T deserialize(byte[] serialized) throws IOException;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
index 9e1d73e6..2c1fedef 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
@@ -17,8 +17,38 @@
 
 package org.apache.seatunnel.api.sink;
 
+import org.apache.seatunnel.api.serialization.Serializer;
+
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+public interface Sink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable {
+
+    SinkWriter<IN, StateT> createWriter(SinkWriter.Context context) throws IOException;
+
+    default SinkWriter<IN, StateT> restoreWriter(SinkWriter.Context context, List<StateT> states) throws IOException {
+        return createWriter(context);
+    }
+
+    default Optional<Serializer<StateT>> getWriterStateSerializer() {
+        return Optional.empty();
+    }
+
+    default Optional<SinkCommitter<CommitInfoT>> createCommitter() throws IOException {
+        return Optional.empty();
+    }
+
+    default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() {
+        return Optional.empty();
+    }
 
-public interface Sink<IN> extends Serializable {
+    default Optional<SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>> createAggregatedCommitter() throws IOException {
+        return Optional.empty();
+    }
 
+    default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() {
+        return Optional.empty();
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index e4947fcb..1f38efa3 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -17,5 +17,13 @@
 
 package org.apache.seatunnel.api.sink;
 
-public interface SinkAggregatedCommitter {
+import java.io.IOException;
+import java.util.List;
+
+public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> {
+
+    List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo)
+            throws IOException, InterruptedException;
+
+    void abort() throws Exception;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
index 176cca65..2dff2525 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -17,5 +17,14 @@
 
 package org.apache.seatunnel.api.sink;
 
-public interface SinkCommitter {
+import java.io.IOException;
+import java.util.List;
+
+public interface SinkCommitter<CommitInfoT> {
+
+    List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException;
+
+    List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException;
+
+    void abort() throws Exception;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
index 563d9fa0..933bfeb0 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -17,5 +17,47 @@
 
 package org.apache.seatunnel.api.sink;
 
-public interface SinkWriter {
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface SinkWriter<T, StateT> {
+    void write(T element) throws IOException, InterruptedException;
+
+    /**
+     * @return The writer's state.
+     * @throws IOException if fail to snapshot writer's state.
+     * @deprecated implement {@link #snapshotState(long)}
+     */
+    default List<StateT> snapshotState() throws IOException {
+        return Collections.emptyList();
+    }
+
+    /**
+     * @return The writer's state.
+     * @throws IOException if fail to snapshot writer's state.
+     */
+    default List<StateT> snapshotState(long checkpointId) throws IOException {
+        return snapshotState();
+    }
+
+    interface Context {
+
+        /**
+         * Gets the configuration with which Flink was started.
+         */
+        Map<String, String> getConfiguration();
+
+        /**
+         * @return The index of this subtask.
+         */
+        int getIndexOfSubtask();
+
+        /**
+         * @return The number of parallel Sink tasks.
+         */
+        int getNumberOfParallelSubtasks();
+
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
index 5a84fbf7..f36a7f87 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
@@ -17,5 +17,8 @@
 
 package org.apache.seatunnel.api.source;
 
-public interface Collector {
+public interface Collector<T> {
+
+    void collect(T record);
+
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
index 6dd2e74a..80cf9aaf 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
@@ -17,17 +17,31 @@
 
 package org.apache.seatunnel.api.source;
 
+import org.apache.seatunnel.api.serialization.Serializer;
+
 import java.io.Serializable;
 
 /**
  * The interface for Source. It acts like a factory class that helps construct the {@link
  * SourceSplitEnumerator} and {@link SourceReader} and corresponding serializers.
  *
- * @param <T> The type of records produced by the source.
+ * @param <T>      The type of records produced by the source.
  * @param <SplitT> The type of splits handled by the source.
- * @param <StateT> The type of state to store.
  */
 public interface Source<T, SplitT extends SourceSplit, StateT> extends Serializable {
 
+    /**
+     * Get the boundedness of this source.
+     *
+     * @return the boundedness of this source.
+     */
+    Boundedness getBoundedness();
+
+    SourceReader<T, SplitT> createReader(SourceReader.Context readerContext) throws Exception;
+
+    SourceSplitEnumerator<SplitT, StateT> createEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext) throws Exception;
+
+    SourceSplitEnumerator<SplitT, StateT> restoreEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext, StateT checkpointState) throws Exception;
 
+    Serializer<StateT> getEnumeratorStateSerializer();
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
index e1255b95..4d2374b4 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
@@ -17,5 +17,10 @@
 
 package org.apache.seatunnel.api.source;
 
-public interface SourceEvent {
+import java.io.Serializable;
+
+/**
+ * An base class for the events passed between the SourceReaders and Enumerators.
+ */
+public interface SourceEvent extends Serializable {
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
index 7e784f17..dff0527d 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
@@ -17,9 +17,46 @@
 
 package org.apache.seatunnel.api.source;
 
-public interface SourceReader {
+import org.apache.seatunnel.api.state.CheckpointListener;
 
-    interface SupportCoordinate {
+import java.util.List;
+import java.util.Map;
 
+public interface SourceReader<T, SplitT extends SourceSplit> extends CheckpointListener {
+
+    void start(Collector<T> output) throws Exception;
+
+    List<SplitT> snapshotState(long checkpointId);
+
+    void addSplits(List<SplitT> splits);
+
+    default void handleSourceEvent(SourceEvent sourceEvent) {
+    }
+
+    interface Context {
+
+        /**
+         * Gets the configuration with which Flink was started.
+         */
+        Map<String, String> getConfiguration();
+
+        /**
+         * @return The index of this subtask.
+         */
+        int getIndexOfSubtask();
+
+        /**
+         * Sends a split request to the source's {@link SourceSplitEnumerator}. This will result in a call to
+         * the {@link SourceSplitEnumerator#handleSplitRequest(int, String)} method, with this reader's
+         * parallel subtask id and the hostname where this reader runs.
+         */
+        void sendSplitRequest();
+
+        /**
+         * Send a source event to the source coordinator.
+         *
+         * @param sourceEvent the source event to coordinator.
+         */
+        void sendSourceEventToCoordinator(SourceEvent sourceEvent);
     }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java
index e7b6b9b1..af6e1acf 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.api.source;
 
-/** An interface for all the Split types to extend. */
+/**
+ * An interface for all the Split types to extend.
+ */
 public interface SourceSplit {
 
     /**
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
index 0dff0b1d..1c2291fa 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -17,9 +17,73 @@
 
 package org.apache.seatunnel.api.source;
 
-public interface SourceSplitEnumerator {
+import org.apache.seatunnel.api.state.CheckpointListener;
 
-    interface SupportCoordinate {
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> extends CheckpointListener {
+
+    void handleSplitRequest(int subtaskId, String requesterHostname);
+
+    void registerReader(int subtaskId);
+
+    StateT snapshotState(long checkpointId) throws Exception;
+
+    default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
     }
+
+    interface Context<SplitT extends SourceSplit> {
+
+        int currentParallelism();
+
+        /**
+         * Get the currently registered readers. The mapping is from subtask id to the reader info.
+         *
+         * @return the currently registered readers.
+         */
+        Set<Integer> registeredReaders();
+
+        /**
+         * Assign the splits.
+         *
+         * @param newSplitAssignments the new split assignments to add.
+         */
+        void assignSplits(Map<Integer, List<SplitT>> newSplitAssignments);
+
+        /**
+         * Assigns a single split.
+         *
+         * <p>When assigning multiple splits, it is more efficient to assign all of them in a single
+         * call to the {@link #assignSplits} method.
+         *
+         * @param split   The new split
+         * @param subtask The index of the operator's parallel subtask that shall receive the split.
+         */
+        default void assignSplit(SplitT split, int subtask) {
+            Map<Integer, List<SplitT>> splits = new HashMap<>();
+            splits.put(subtask, Collections.singletonList(split));
+            assignSplits(splits);
+        }
+
+        /**
+         * Signals a subtask that it will not receive any further split.
+         *
+         * @param subtask The index of the operator's parallel subtask that shall be signaled it will
+         *                not receive any further split.
+         */
+        void signalNoMoreSplits(int subtask);
+
+        /**
+         * Send a source event to a source reader. The source reader is identified by its subtask id.
+         *
+         * @param subtaskId the subtask id of the source reader to send this event to.
+         * @param event     the source event to send.
+         */
+        void sendEventToSourceReader(int subtaskId, SourceEvent event);
+    }
+
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportCoordinate.java
similarity index 88%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportCoordinate.java
index 5a84fbf7..2de9a3af 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportCoordinate.java
@@ -17,5 +17,8 @@
 
 package org.apache.seatunnel.api.source;
 
-public interface Collector {
+/**
+ * Used to mark whether the interface supports coordination.
+ */
+public interface SupportCoordinate {
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
index a48d256c..8abf1398 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
@@ -18,4 +18,9 @@
 package org.apache.seatunnel.api.state;
 
 public interface CheckpointListener {
+
+    void notifyCheckpointComplete(long checkpointId) throws Exception;
+
+    default void notifyCheckpointAborted(long checkpointId) throws Exception {
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java
deleted file mode 100644
index fb09a761..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.state;
-
-public interface StatefulOperator {
-}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index 7e6a72a3..a8211f3e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -17,6 +17,7 @@
  */
 
 package org.apache.seatunnel.api.table.catalog;
+
 import org.apache.seatunnel.api.table.type.DataType;
 
 import java.util.Objects;
@@ -36,7 +37,9 @@ public abstract class Column {
         this.comment = comment;
     }
 
-    /** Creates a regular table column that represents physical data. */
+    /**
+     * Creates a regular table column that represents physical data.
+     */
     public static PhysicalColumn physical(String name, DataType dataType) {
         return new PhysicalColumn(name, dataType);
     }
@@ -52,8 +55,10 @@ public abstract class Column {
         return new MetadataColumn(name, dataType, metadataKey);
     }
 
-    /** Add the comment to the column and return the new object. */
-    public abstract Column withComment( String comment);
+    /**
+     * Add the comment to the column and return the new object.
+     */
+    public abstract Column withComment(String comment);
 
     /**
      * Returns whether the given column is a physical column of a table; neither computed nor
@@ -61,22 +66,30 @@ public abstract class Column {
      */
     public abstract boolean isPhysical();
 
-    /** Returns the data type of this column. */
+    /**
+     * Returns the data type of this column.
+     */
     public DataType getDataType() {
         return this.dataType;
     }
 
-    /** Returns the name of this column. */
+    /**
+     * Returns the name of this column.
+     */
     public String getName() {
         return name;
     }
 
-    /** Returns the comment of this column. */
+    /**
+     * Returns the comment of this column.
+     */
     public Optional<String> getComment() {
         return Optional.ofNullable(comment);
     }
 
-    /** Returns a copy of the column with a replaced {@link DataType}. */
+    /**
+     * Returns a copy of the column with a replaced {@link DataType}.
+     */
     public abstract Column copy(DataType newType);
 
     @Override
@@ -102,7 +115,9 @@ public abstract class Column {
     // Specific kinds of columns
     // --------------------------------------------------------------------------------------------
 
-    /** Representation of a physical column. */
+    /**
+     * Representation of a physical column.
+     */
     public static final class PhysicalColumn extends Column {
 
         private PhysicalColumn(String name, DataType dataType) {
@@ -132,7 +147,9 @@ public abstract class Column {
         }
     }
 
-    /** Representation of a metadata column. */
+    /**
+     * Representation of a metadata column.
+     */
     public static final class MetadataColumn extends Column {
 
         private final String metadataKey;
@@ -156,7 +173,7 @@ public abstract class Column {
         }
 
         @Override
-        public MetadataColumn withComment( String comment) {
+        public MetadataColumn withComment(String comment) {
             if (comment == null) {
                 return this;
             }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
index da7b02f3..e3c60ada 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
@@ -22,7 +22,7 @@ import java.util.Objects;
 
 public final class TableIdentifier implements Serializable {
     private static final long serialVersionUID = 1L;
-    
+
     private final String catalogName;
 
     private final String databaseName;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index ec47b0ff..ac46fd8a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -32,7 +32,9 @@ public final class TableSchema implements Serializable {
         return new TableSchema(columns);
     }
 
-    /** Returns all {@link Column}s of this schema. */
+    /**
+     * Returns all {@link Column}s of this schema.
+     */
     public List<Column> getColumns() {
         return columns;
     }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java
index 31b526bc..d91e6670 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java
@@ -17,22 +17,28 @@
 
 package org.apache.seatunnel.api.table.catalog.exception;
 
-/** A catalog-related, runtime exception. */
+/**
+ * A catalog-related, runtime exception.
+ */
 public class CatalogException extends RuntimeException {
 
-    /** @param message the detail message. */
+    /**
+     * @param message the detail message.
+     */
     public CatalogException(String message) {
         super(message);
     }
 
-    /** @param cause the cause. */
+    /**
+     * @param cause the cause.
+     */
     public CatalogException(Throwable cause) {
         super(cause);
     }
 
     /**
      * @param message the detail message.
-     * @param cause the cause.
+     * @param cause   the cause.
      */
     public CatalogException(String message, Throwable cause) {
         super(message, cause);
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
index aef9ba52..3fc90e17 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
@@ -25,7 +25,6 @@ import java.util.Map;
 
 /**
  * Used for {@link TableSource} to support metadata columns.
- *
  */
 public interface SupportReadingMetadata {
 
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
index a7a9bf4e..fe0be552 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
@@ -21,5 +21,5 @@ import org.apache.seatunnel.api.sink.Sink;
 
 public interface TableSink {
 
-    Sink<?> createSink();
+    Sink<?, ?, ?, ?> createSink();
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
index a1290f81..2267ab1f 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
@@ -21,5 +21,5 @@ import org.apache.seatunnel.api.source.Source;
 
 public interface TableSource {
 
-    Source<?,?,?> createSource();
+    Source<?, ?, ?> createSource();
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
index 9742f1df..2448c77e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
@@ -21,16 +21,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
 
 import java.util.Map;
 
-public interface CatalogFactory {
-
-    /**
-     * Returns a unique identifier among same factory interfaces.
-     *
-     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
-     */
-    String factoryIdentifier();
+public interface CatalogFactory extends Factory {
 
     /**
      * Creates a {@link Catalog} using the options.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
similarity index 82%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
index 9742f1df..59b0fd51 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java
@@ -17,11 +17,7 @@
 
 package org.apache.seatunnel.api.table.factory;
 
-import org.apache.seatunnel.api.table.catalog.Catalog;
-
-import java.util.Map;
-
-public interface CatalogFactory {
+public interface Factory {
 
     /**
      * Returns a unique identifier among same factory interfaces.
@@ -31,9 +27,4 @@ public interface CatalogFactory {
      * using "-" (e.g. {@code elasticsearch-7}).
      */
     String factoryIdentifier();
-
-    /**
-     * Creates a {@link Catalog} using the options.
-     */
-    Catalog createCatalog(String catalogName, Map<String, String> options);
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryException.java
similarity index 78%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryException.java
index 67fcb835..64e5214c 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryException.java
@@ -17,6 +17,13 @@
 
 package org.apache.seatunnel.api.table.factory;
 
-public final class TableFactoryUtil {
+public class FactoryException extends RuntimeException {
 
+    public FactoryException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public FactoryException(String message) {
+        super(message);
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
new file mode 100644
index 00000000..2b7ba445
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.factory;
+
+import org.apache.seatunnel.api.sink.Sink;
+import org.apache.seatunnel.api.source.Source;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+/**
+ * Use SPI to create {@link TableSourceFactory}, {@link TableSinkFactory} and {@link CatalogFactory}.
+ */
+public final class FactoryUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+    public static List<Source> createAndPrepareSource(
+            List<CatalogTable> multipleTables,
+            Map<String, String> options,
+            ClassLoader classLoader,
+            String factoryIdentifier) {
+
+
+        try {
+            final TableSourceFactory factory = discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier);
+            List<Source> sources = new ArrayList<>(multipleTables.size());
+            if (factory instanceof SupportMultipleTable) {
+                TableFactoryContext context = new TableFactoryContext(multipleTables, options, classLoader);
+                SupportMultipleTable multipleTableSourceFactory = ((SupportMultipleTable) factory);
+                // TODO: create all source
+                SupportMultipleTable.Result result = multipleTableSourceFactory.applyTables(context);
+                TableSource multipleTableSource = factory.createSource(new TableFactoryContext(result.getAcceptedTables(), options, classLoader));
+                // TODO: handle reading metadata
+                Source<?, ?, ?> source = multipleTableSource.createSource();
+                sources.add(source);
+            }
+            return sources;
+        } catch (Throwable t) {
+            throw new FactoryException(
+                    String.format(
+                            "Unable to create a source for identifier '%s'.", factoryIdentifier),
+                    t);
+        }
+    }
+
+    public static List<Sink> createAndPrepareSink() {
+        return null;
+    }
+
+    public static Catalog createCatalog(String catalogName,
+                                        Map<String, String> options,
+                                        ClassLoader classLoader,
+                                        String factoryIdentifier) {
+        CatalogFactory catalogFactory = discoverFactory(classLoader, CatalogFactory.class, factoryIdentifier);
+        return catalogFactory.createCatalog(catalogName, options);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T extends Factory> T discoverFactory(
+            ClassLoader classLoader, Class<T> factoryClass, String factoryIdentifier) {
+        final List<Factory> factories = discoverFactories(classLoader);
+
+        final List<Factory> foundFactories =
+                factories.stream()
+                        .filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+                        .collect(Collectors.toList());
+
+        if (foundFactories.isEmpty()) {
+            throw new FactoryException(
+                    String.format(
+                            "Could not find any factories that implement '%s' in the classpath.",
+                            factoryClass.getName()));
+        }
+
+        final List<Factory> matchingFactories =
+                foundFactories.stream()
+                        .filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+                        .collect(Collectors.toList());
+
+        if (matchingFactories.isEmpty()) {
+            throw new FactoryException(
+                    String.format(
+                            "Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n"
+                                    + "Available factory identifiers are:\n\n"
+                                    + "%s",
+                            factoryIdentifier,
+                            factoryClass.getName(),
+                            foundFactories.stream()
+                                    .map(Factory::factoryIdentifier)
+                                    .distinct()
+                                    .sorted()
+                                    .collect(Collectors.joining("\n"))));
+        }
+
+        if (matchingFactories.size() > 1) {
+            throw new FactoryException(
+                    String.format(
+                            "Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n"
+                                    + "Ambiguous factory classes are:\n\n"
+                                    + "%s",
+                            factoryIdentifier,
+                            factoryClass.getName(),
+                            matchingFactories.stream()
+                                    .map(f -> f.getClass().getName())
+                                    .sorted()
+                                    .collect(Collectors.joining("\n"))));
+        }
+
+        return (T) matchingFactories.get(0);
+    }
+
+    private static List<Factory> discoverFactories(ClassLoader classLoader) {
+        try {
+            final List<Factory> result = new LinkedList<>();
+            ServiceLoader.load(Factory.class, classLoader)
+                    .iterator()
+                    .forEachRemaining(result::add);
+            return result;
+        } catch (ServiceConfigurationError e) {
+            LOG.error("Could not load service provider for factories.", e);
+            throw new FactoryException("Could not load service provider for factories.", e);
+        }
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
index 7c34c354..6b15786a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
@@ -30,7 +30,7 @@ public interface SupportMultipleTable {
     /**
      * A connector can pick tables and return the accepted and remaining tables.
      */
-    Result applyTables(TableFactory.Context context);
+    Result applyTables(TableFactoryContext context);
 
     final class Result {
         private final List<CatalogTable> acceptedTables;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
similarity index 51%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
index b531e71e..6caf0b89 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
@@ -22,31 +22,38 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import java.util.List;
 import java.util.Map;
 
-public interface TableFactory {
-    /**
-     * Returns a unique identifier among same factory interfaces.
-     *
-     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
-     */
-    String factoryIdentifier();
+public class TableFactoryContext {
+
+    private final List<CatalogTable> catalogTables;
+    private final Map<String, String> options;
+    private final ClassLoader classLoader;
+
+    public TableFactoryContext(
+            List<CatalogTable> catalogTables,
+            Map<String, String> options,
+            ClassLoader classLoader) {
+        this.catalogTables = catalogTables;
+        this.options = options;
+        this.classLoader = classLoader;
+    }
 
-    /** Provides information describing the multi-table to be accessed. */
-    interface Context {
 
-        ClassLoader getClassLoader();
+    public ClassLoader getClassLoader() {
+        return this.classLoader;
+    }
 
-        /**
-         * Returns a list of tables that need to be processed.
-         *
-         * <p> By default, return only single table.
-         *
-         * <p> If you need multiple tables, implement {@link SupportMultipleTable}.
-         */
-        List<CatalogTable> getCatalogTable();
+    /**
+     * Returns a list of tables that need to be processed.
+     *
+     * <p> By default, return only single table.
+     *
+     * <p> If you need multiple tables, implement {@link SupportMultipleTable}.
+     */
+    public List<CatalogTable> getCatalogTables() {
+        return catalogTables;
+    }
 
-        /** Gives read-only access to the configuration of the current session. */
-        Map<String, String> getOptions();
+    public Map<String, String> getOptions() {
+        return this.options;
     }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index 33319659..bb92a76a 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.api.table.factory;
 
 import org.apache.seatunnel.api.table.connector.TableSink;
 
-public interface TableSinkFactory extends TableFactory {
+public interface TableSinkFactory extends Factory {
 
-    TableSink createSink(TableFactory.Context context);
+    TableSink createSink(TableFactoryContext context);
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 8318ab3a..241deeb0 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.api.table.factory;
 
 import org.apache.seatunnel.api.table.connector.TableSource;
 
-public interface TableSourceFactory extends TableFactory {
+public interface TableSourceFactory extends Factory {
 
-    TableSource createSource(TableFactory.Context context);
+    TableSource createSource(TableFactoryContext context);
 }
diff --git a/seatunnel-translation/pom.xml b/seatunnel-translation/pom.xml
new file mode 100644
index 00000000..3a44957b
--- /dev/null
+++ b/seatunnel-translation/pom.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.0.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-translation</artifactId>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>seatunnel-translation-base</module>
+        <module>seatunnel-translation-flink</module>
+        <module>seatunnel-translation-spark</module>
+    </modules>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-translation/seatunnel-translation-base/pom.xml b/seatunnel-translation/seatunnel-translation-base/pom.xml
new file mode 100644
index 00000000..f9bbaff4
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-base/pom.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.0.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-translation-base</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
similarity index 55%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
index 9742f1df..06bd52e9 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java
@@ -15,25 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.factory;
+package org.apache.seatunnel.translation.serialization;
 
-import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.type.Row;
 
-import java.util.Map;
+import java.io.IOException;
 
-public interface CatalogFactory {
+public interface RowSerialization<T> {
 
     /**
-     * Returns a unique identifier among same factory interfaces.
+     * Serializes the given object.
      *
-     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
+     * @param seaTunnelRow The object to serialize.
+     * @return The serialized data (bytes).
+     * @throws IOException Thrown, if the serialization fails.
      */
-    String factoryIdentifier();
+    T serialize(Row seaTunnelRow) throws IOException;
 
     /**
-     * Creates a {@link Catalog} using the options.
+     * De-serializes the given data (bytes).
+     *
+     * @param engineRow The internal engine row
+     * @return The SeaTunnel Row
+     * @throws IOException Thrown, if the deserialization fails.
      */
-    Catalog createCatalog(String catalogName, Map<String, String> options);
+    Row deserialize(T engineRow) throws IOException;
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
similarity index 90%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 563d9fa0..2816df97 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -15,7 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.source;
+
+public class CoordinatedSource {
 
-public interface SinkWriter {
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
similarity index 91%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 563d9fa0..975fbfa7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.translation.source;
 
-public interface SinkWriter {
+public class ParallelSource {
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-translation/seatunnel-translation-flink/pom.xml
new file mode 100644
index 00000000..7c8c3900
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.0.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-translation-flink</artifactId>
+
+    <properties>
+        <flink.version>1.13.6</flink.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-translation-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <!-- apache flink table -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
new file mode 100644
index 00000000..b45d100d
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
@@ -0,0 +1,19 @@
+package org.apache.seatunnel.translation.flink.serialization;
+
+import org.apache.flink.types.Row;
+import org.apache.seatunnel.translation.serialization.RowSerialization;
+
+import java.io.IOException;
+
+public class FlinkRowSerialization implements RowSerialization<Row> {
+
+    @Override
+    public Row serialize(org.apache.seatunnel.api.table.type.Row seaTunnelRow) throws IOException {
+        return null;
+    }
+
+    @Override
+    public org.apache.seatunnel.api.table.type.Row deserialize(Row engineRow) throws IOException {
+        return null;
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/pom.xml b/seatunnel-translation/seatunnel-translation-spark/pom.xml
new file mode 100644
index 00000000..c381244a
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-spark/pom.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.0.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-translation-spark</artifactId>
+
+    <properties>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-translation-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file


[incubator-seatunnel] 01/02: [Feature][core] base interface.

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git

commit 325922374bdadc85840100cadba192a87df5e7e8
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Tue Mar 29 23:36:39 2022 +0800

    [Feature][core] base interface.
---
 pom.xml                                            |  11 ++
 seatunnel-api/pom.xml                              |  23 +++
 .../java/org/apache/seatunnel/api/sink/Sink.java   |  24 +++
 .../api/sink/SinkAggregatedCommitter.java          |  21 +++
 .../apache/seatunnel/api/sink/SinkCommitter.java   |  21 +++
 .../org/apache/seatunnel/api/sink/SinkWriter.java  |  21 +++
 .../apache/seatunnel/api/source/Boundedness.java   |  29 +++
 .../org/apache/seatunnel/api/source/Collector.java |  21 +++
 .../org/apache/seatunnel/api/source/Source.java    |  33 ++++
 .../apache/seatunnel/api/source/SourceEvent.java   |  21 +++
 .../apache/seatunnel/api/source/SourceReader.java  |  25 +++
 .../apache/seatunnel/api/source/SourceSplit.java   |  29 +++
 .../api/source/SourceSplitEnumerator.java          |  25 +++
 .../seatunnel/api/state/CheckpointListener.java    |  21 +++
 .../java/org/apache/seatunnel/api/state/State.java |  21 +++
 .../seatunnel/api/state/StatefulOperator.java      |  21 +++
 .../seatunnel/api/table/catalog/Catalog.java       |  21 +++
 .../seatunnel/api/table/catalog/CatalogTable.java  |  78 ++++++++
 .../apache/seatunnel/api/table/catalog/Column.java | 196 +++++++++++++++++++++
 .../api/table/catalog/TableIdentifier.java         |  81 +++++++++
 .../seatunnel/api/table/catalog/TablePath.java     |  86 +++++++++
 .../seatunnel/api/table/catalog/TableSchema.java   |  39 ++++
 .../table/catalog/exception/CatalogException.java  |  40 +++++
 .../table/connector/SupportReadingMetadata.java    |  35 ++++
 .../seatunnel/api/table/connector/TableSink.java   |  25 +++
 .../seatunnel/api/table/connector/TableSource.java |  25 +++
 .../api/table/factory/CatalogFactory.java          |  39 ++++
 .../api/table/factory/SupportMultipleTable.java    |  60 +++++++
 .../seatunnel/api/table/factory/TableFactory.java  |  52 ++++++
 .../api/table/factory/TableFactoryUtil.java        |  22 +++
 .../api/table/factory/TableSinkFactory.java        |  25 +++
 .../api/table/factory/TableSourceFactory.java      |  25 +++
 .../apache/seatunnel/api/table/type/DataType.java  |  21 +++
 .../org/apache/seatunnel/api/table/type/Row.java   |  24 +++
 34 files changed, 1261 insertions(+)

diff --git a/pom.xml b/pom.xml
index a5a345c7..5804f998 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,7 @@
         <module>seatunnel-dist</module>
         <module>seatunnel-examples</module>
         <module>seatunnel-e2e</module>
+        <module>seatunnel-api</module>
     </modules>
 
     <properties>
@@ -158,6 +159,9 @@
         <testcontainer.version>1.16.3</testcontainer.version>
         <skipUT>false</skipUT>
         <skipIT>true</skipIT>
+
+        <!-- logging -->
+        <slf4j.version>1.7.36</slf4j.version>
     </properties>
 
     <dependencyManagement>
@@ -547,6 +551,13 @@
                 <artifactId>scala-library</artifactId>
                 <version>${scala.version}</version>
             </dependency>
+
+            <!-- logging -->
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-api</artifactId>
+                <version>${slf4j.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/seatunnel-api/pom.xml b/seatunnel-api/pom.xml
new file mode 100644
index 00000000..17b387b3
--- /dev/null
+++ b/seatunnel-api/pom.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.0.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-api</artifactId>
+
+    <properties>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
new file mode 100644
index 00000000..9e1d73e6
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink;
+
+import java.io.Serializable;
+
+public interface Sink<IN> extends Serializable {
+
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
new file mode 100644
index 00000000..e4947fcb
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink;
+
+public interface SinkAggregatedCommitter {
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
new file mode 100644
index 00000000..176cca65
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink;
+
+public interface SinkCommitter {
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
new file mode 100644
index 00000000..563d9fa0
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink;
+
+public interface SinkWriter {
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java
new file mode 100644
index 00000000..bceb3cdb
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Boundedness.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.source;
+
+public enum Boundedness {
+    /**
+     * A BOUNDED stream is a stream with finite records.
+     */
+    BOUNDED,
+    /**
+     * A UNBOUNDED stream is a stream with infinite records.
+     */
+    UNBOUNDED
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
new file mode 100644
index 00000000..5a84fbf7
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.source;
+
+public interface Collector {
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
new file mode 100644
index 00000000..6dd2e74a
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.source;
+
+import java.io.Serializable;
+
+/**
+ * The interface for Source. It acts like a factory class that helps construct the {@link
+ * SourceSplitEnumerator} and {@link SourceReader} and corresponding serializers.
+ *
+ * @param <T> The type of records produced by the source.
+ * @param <SplitT> The type of splits handled by the source.
+ * @param <StateT> The type of state to store.
+ */
+public interface Source<T, SplitT extends SourceSplit, StateT> extends Serializable {
+
+
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
new file mode 100644
index 00000000..e1255b95
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.source;
+
+public interface SourceEvent {
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
new file mode 100644
index 00000000..7e784f17
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.source;
+
+public interface SourceReader {
+
+    interface SupportCoordinate {
+
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java
new file mode 100644
index 00000000..e7b6b9b1
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.source;
+
+/** An interface for all the Split types to extend. */
+public interface SourceSplit {
+
+    /**
+     * Get the split id of this source split.
+     *
+     * @return id of this source split.
+     */
+    String splitId();
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
new file mode 100644
index 00000000..0dff0b1d
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.source;
+
+public interface SourceSplitEnumerator {
+
+    interface SupportCoordinate {
+
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
new file mode 100644
index 00000000..a48d256c
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.state;
+
+public interface CheckpointListener {
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java
new file mode 100644
index 00000000..d7e43d44
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/State.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.state;
+
+public interface State {
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java
new file mode 100644
index 00000000..fb09a761
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.state;
+
+public interface StatefulOperator {
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
new file mode 100644
index 00000000..96ce0ab1
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+public interface Catalog {
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
new file mode 100644
index 00000000..e4ec2e7c
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public final class CatalogTable implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private final TableIdentifier tableId;
+    private final TableSchema tableSchema;
+    private final Map<String, String> options;
+    private final List<String> partitionKeys;
+    private final String comment;
+
+    public static CatalogTable of(
+            TableIdentifier tableId,
+            TableSchema tableSchema,
+            Map<String, String> options,
+            List<String> partitionKeys,
+            String comment) {
+        return new CatalogTable(
+                tableId,
+                tableSchema,
+                options,
+                partitionKeys,
+                comment);
+    }
+
+    private CatalogTable(
+            TableIdentifier tableId,
+            TableSchema tableSchema,
+            Map<String, String> options,
+            List<String> partitionKeys,
+            String comment) {
+        this.tableId = tableId;
+        this.tableSchema = tableSchema;
+        this.options = options;
+        this.partitionKeys = partitionKeys;
+        this.comment = comment;
+    }
+
+    public TableIdentifier getTableId() {
+        return tableId;
+    }
+
+    public TableSchema getTableSchema() {
+        return tableSchema;
+    }
+
+    public Map<String, String> getOptions() {
+        return options;
+    }
+
+    public List<String> getPartitionKeys() {
+        return partitionKeys;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
new file mode 100644
index 00000000..7e6a72a3
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+import org.apache.seatunnel.api.table.type.DataType;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public abstract class Column {
+
+    protected final String name;
+
+    protected final DataType dataType;
+
+    protected final String comment;
+
+    private Column(String name, DataType dataType, String comment) {
+        this.name = name;
+        this.dataType = dataType;
+        this.comment = comment;
+    }
+
+    /** Creates a regular table column that represents physical data. */
+    public static PhysicalColumn physical(String name, DataType dataType) {
+        return new PhysicalColumn(name, dataType);
+    }
+
+    /**
+     * Creates a metadata column from metadata of the given column name or from metadata of the
+     * given key (if not null).
+     *
+     * <p>Allows to specify whether the column is virtual or not.
+     */
+    public static MetadataColumn metadata(
+            String name, DataType dataType, String metadataKey) {
+        return new MetadataColumn(name, dataType, metadataKey);
+    }
+
+    /** Add the comment to the column and return the new object. */
+    public abstract Column withComment( String comment);
+
+    /**
+     * Returns whether the given column is a physical column of a table; neither computed nor
+     * metadata.
+     */
+    public abstract boolean isPhysical();
+
+    /** Returns the data type of this column. */
+    public DataType getDataType() {
+        return this.dataType;
+    }
+
+    /** Returns the name of this column. */
+    public String getName() {
+        return name;
+    }
+
+    /** Returns the comment of this column. */
+    public Optional<String> getComment() {
+        return Optional.ofNullable(comment);
+    }
+
+    /** Returns a copy of the column with a replaced {@link DataType}. */
+    public abstract Column copy(DataType newType);
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Column that = (Column) o;
+        return Objects.equals(this.name, that.name)
+                && Objects.equals(this.dataType, that.dataType)
+                && Objects.equals(this.comment, that.comment);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(this.name, this.dataType);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Specific kinds of columns
+    // --------------------------------------------------------------------------------------------
+
+    /** Representation of a physical column. */
+    public static final class PhysicalColumn extends Column {
+
+        private PhysicalColumn(String name, DataType dataType) {
+            this(name, dataType, null);
+        }
+
+        private PhysicalColumn(String name, DataType dataType, String comment) {
+            super(name, dataType, comment);
+        }
+
+        @Override
+        public PhysicalColumn withComment(String comment) {
+            if (comment == null) {
+                return this;
+            }
+            return new PhysicalColumn(name, dataType, comment);
+        }
+
+        @Override
+        public boolean isPhysical() {
+            return true;
+        }
+
+        @Override
+        public Column copy(DataType newDataType) {
+            return new PhysicalColumn(name, newDataType, comment);
+        }
+    }
+
+    /** Representation of a metadata column. */
+    public static final class MetadataColumn extends Column {
+
+        private final String metadataKey;
+
+        private MetadataColumn(
+                String name, DataType dataType, String metadataKey) {
+            this(name, dataType, metadataKey, null);
+        }
+
+        private MetadataColumn(
+                String name,
+                DataType dataType,
+                String metadataKey,
+                String comment) {
+            super(name, dataType, comment);
+            this.metadataKey = metadataKey;
+        }
+
+        public Optional<String> getMetadataKey() {
+            return Optional.ofNullable(metadataKey);
+        }
+
+        @Override
+        public MetadataColumn withComment( String comment) {
+            if (comment == null) {
+                return this;
+            }
+            return new MetadataColumn(name, dataType, metadataKey, comment);
+        }
+
+        @Override
+        public boolean isPhysical() {
+            return false;
+        }
+
+        @Override
+        public Column copy(DataType newDataType) {
+            return new MetadataColumn(name, newDataType, metadataKey, comment);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            MetadataColumn that = (MetadataColumn) o;
+            return Objects.equals(metadataKey, that.metadataKey);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), metadataKey);
+        }
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
new file mode 100644
index 00000000..da7b02f3
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public final class TableIdentifier implements Serializable {
+    private static final long serialVersionUID = 1L;
+    
+    private final String catalogName;
+
+    private final String databaseName;
+
+    private final String tableName;
+
+    private TableIdentifier(String catalogName, String databaseName, String tableName) {
+        this.catalogName = catalogName;
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+    }
+
+    public static TableIdentifier of(String catalogName, String databaseName, String tableName) {
+        return new TableIdentifier(catalogName, databaseName, tableName);
+    }
+
+    public String getCatalogName() {
+        return catalogName;
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    public String gettableName() {
+        return tableName;
+    }
+
+    public TablePath toTablePath() {
+        return TablePath.of(databaseName, tableName);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TableIdentifier that = (TableIdentifier) o;
+        return catalogName.equals(that.catalogName)
+                && databaseName.equals(that.databaseName)
+                && tableName.equals(that.tableName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(catalogName, databaseName, tableName);
+    }
+
+    @Override
+    public String toString() {
+        return String.join(".", catalogName, databaseName, tableName);
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
new file mode 100644
index 00000000..61f61d93
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public final class TablePath implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private final String databaseName;
+    private final String tableName;
+
+    private TablePath(String databaseName, String tableName) {
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+    }
+
+    public static TablePath of(String fullName) {
+        String[] paths = fullName.split("\\.");
+
+        if (paths.length != 2) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Cannot get split '%s' to get databaseName and tableName", fullName));
+        }
+
+        return new TablePath(paths[0], paths[1]);
+    }
+
+    public static TablePath of(String databaseName, String tableName) {
+        return new TablePath(databaseName, tableName);
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public String getFullName() {
+        return String.format("%s.%s", databaseName, tableName);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        TablePath that = (TablePath) o;
+
+        return Objects.equals(databaseName, that.databaseName)
+                && Objects.equals(tableName, that.tableName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(databaseName, tableName);
+    }
+
+    @Override
+    public String toString() {
+        return getFullName();
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
new file mode 100644
index 00000000..ec47b0ff
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import java.io.Serializable;
+import java.util.List;
+
+public final class TableSchema implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private final List<Column> columns;
+
+    private TableSchema(List<Column> columns) {
+        this.columns = columns;
+    }
+
+    public static TableSchema of(List<Column> columns) {
+        return new TableSchema(columns);
+    }
+
+    /** Returns all {@link Column}s of this schema. */
+    public List<Column> getColumns() {
+        return columns;
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java
new file mode 100644
index 00000000..31b526bc
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog.exception;
+
+/** A catalog-related, runtime exception. */
+public class CatalogException extends RuntimeException {
+
+    /** @param message the detail message. */
+    public CatalogException(String message) {
+        super(message);
+    }
+
+    /** @param cause the cause. */
+    public CatalogException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * @param message the detail message.
+     * @param cause the cause.
+     */
+    public CatalogException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
new file mode 100644
index 00000000..aef9ba52
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.connector;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.DataType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Used for {@link TableSource} to support metadata columns.
+ *
+ */
+public interface SupportReadingMetadata {
+
+    Map<String, DataType> listReadableMetadata(CatalogTable catalogTable);
+
+    void applyReadableMetadata(CatalogTable catalogTable, List<String> metadataKeys, DataType dataType);
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
new file mode 100644
index 00000000..a7a9bf4e
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.connector;
+
+import org.apache.seatunnel.api.sink.Sink;
+
+public interface TableSink {
+
+    Sink<?> createSink();
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
new file mode 100644
index 00000000..a1290f81
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.connector;
+
+import org.apache.seatunnel.api.source.Source;
+
+public interface TableSource {
+
+    Source<?,?,?> createSource();
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
new file mode 100644
index 00000000..9742f1df
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.factory;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+
+import java.util.Map;
+
+public interface CatalogFactory {
+
+    /**
+     * Returns a unique identifier among same factory interfaces.
+     *
+     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
+     * kafka}). If multiple factories exist for different versions, a version should be appended
+     * using "-" (e.g. {@code elasticsearch-7}).
+     */
+    String factoryIdentifier();
+
+    /**
+     * Creates a {@link Catalog} using the options.
+     */
+    Catalog createCatalog(String catalogName, Map<String, String> options);
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
new file mode 100644
index 00000000..7c34c354
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.factory;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import java.util.List;
+
+/**
+ * Used to declare that the connector can handle data from multiple tables.
+ * <p> The expansion of the {@link TableSourceFactory}.
+ */
+public interface SupportMultipleTable {
+
+    /**
+     * A connector can pick tables and return the accepted and remaining tables.
+     */
+    Result applyTables(TableFactory.Context context);
+
+    final class Result {
+        private final List<CatalogTable> acceptedTables;
+        private final List<CatalogTable> remainingTables;
+
+        private Result(
+                List<CatalogTable> acceptedTables,
+                List<CatalogTable> remainingTables) {
+            this.acceptedTables = acceptedTables;
+            this.remainingTables = remainingTables;
+        }
+
+        public static Result of(
+                List<CatalogTable> acceptedTables,
+                List<CatalogTable> remainingTables) {
+            return new Result(acceptedTables, remainingTables);
+        }
+
+        public List<CatalogTable> getAcceptedTables() {
+            return acceptedTables;
+        }
+
+        public List<CatalogTable> getRemainingTables() {
+            return remainingTables;
+        }
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java
new file mode 100644
index 00000000..b531e71e
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.factory;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+import java.util.List;
+import java.util.Map;
+
+public interface TableFactory {
+    /**
+     * Returns a unique identifier among same factory interfaces.
+     *
+     * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
+     * kafka}). If multiple factories exist for different versions, a version should be appended
+     * using "-" (e.g. {@code elasticsearch-7}).
+     */
+    String factoryIdentifier();
+
+    /** Provides information describing the multi-table to be accessed. */
+    interface Context {
+
+        ClassLoader getClassLoader();
+
+        /**
+         * Returns a list of tables that need to be processed.
+         *
+         * <p> By default, return only single table.
+         *
+         * <p> If you need multiple tables, implement {@link SupportMultipleTable}.
+         */
+        List<CatalogTable> getCatalogTable();
+
+        /** Gives read-only access to the configuration of the current session. */
+        Map<String, String> getOptions();
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java
new file mode 100644
index 00000000..67fcb835
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.factory;
+
+public final class TableFactoryUtil {
+
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
new file mode 100644
index 00000000..33319659
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.factory;
+
+import org.apache.seatunnel.api.table.connector.TableSink;
+
+public interface TableSinkFactory extends TableFactory {
+
+    TableSink createSink(TableFactory.Context context);
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
new file mode 100644
index 00000000..8318ab3a
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.factory;
+
+import org.apache.seatunnel.api.table.connector.TableSource;
+
+public interface TableSourceFactory extends TableFactory {
+
+    TableSource createSource(TableFactory.Context context);
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
new file mode 100644
index 00000000..9d2d403a
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+public interface DataType {
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Row.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Row.java
new file mode 100644
index 00000000..f355cd52
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Row.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+import java.io.Serializable;
+
+public final class Row implements Serializable {
+    private int tableId;
+}