You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/01/31 12:41:05 UTC

[incubator-seatunnel] branch dev updated: [Feature][Core] Support flink 1.16 (#3979)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 66441942f [Feature][Core] Support flink 1.16 (#3979)
66441942f is described below

commit 66441942f19cefc11532ff9b5be0d11dcb14e606
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Tue Jan 31 20:40:59 2023 +0800

    [Feature][Core] Support flink 1.16 (#3979)
    
    * [Feature][Core][Flink] Refactor flink 13 translation and flink 13 starter
    
    * [Feature][Core][Flink] Optimize pom
    
    * [Feature][Core][Flink] Optimize pom
    
    * [Feature][Core][Flink] Fix dependency
    
    * [Feature][Core][Flink] Support flink 15+
    
    * [Feature][Core][Flink] Update pom of seatunnel-dist
    
    * [Feature][Core][Flink] Update assembly-xml of seatunnel-dist
    
    * [Feature][Core][Flink] Update engine type
    
    * [Feature][Core][Flink] Update container config
    
    * [Feature][Core][Flink] Remove useless dependency
    
    * [Feature][Core][Flink] Add starter dependency in e2e modules
    
    * [Feature][Core][Flink] Update pom of seatunnel-flink-examples
    
    * [Feature][Core][Flink] Update pom of seatunnel-flink-connector-v2-e2e
    
    * [Feature][Core][Flink] Fix old e2e
    
    * [Feature][Core][Flink] Add flink 15 16 container
    
    * [Feature][Core][Flink] trigger CI
    
    * [Feature][Core][Flink] Fix type error
    
    * [Feature][Core][Flink] Add some comments for the code
---
 pom.xml                                            |   1 +
 .../seatunnel/core/starter/enums/EngineType.java   |   3 +-
 seatunnel-core/seatunnel-flink-starter/pom.xml     | 163 +------------
 .../seatunnel-flink-13-starter}/pom.xml            |  36 ++-
 .../bin/start-seatunnel-flink-13-connector-v2.sh}  |   2 +-
 .../seatunnel/core/starter/flink/FlinkStarter.java |   4 +-
 .../core/starter/flink/SeaTunnelFlink.java         |   2 +-
 .../core/starter/flink/args/FlinkCommandArgs.java  |   0
 .../flink/command/FlinkConfValidateCommand.java    |   0
 .../flink/command/FlinkTaskExecuteCommand.java     |   0
 .../FlinkAbstractPluginExecuteProcessor.java       |   2 +-
 .../starter/flink/execution/FlinkExecution.java    |   0
 .../flink/execution/FlinkRuntimeEnvironment.java   |   6 +-
 .../flink/execution/SinkExecuteProcessor.java      |   3 +-
 .../flink/execution/SourceExecuteProcessor.java    |   0
 .../flink/execution/TransformExecuteProcessor.java |   0
 .../core/starter/flink/utils}/ConfigKeyName.java   |   2 +-
 .../core/starter/flink/utils}/EnvironmentUtil.java |   2 +-
 .../core/starter/flink/utils}/TableUtil.java       |   2 +-
 .../seatunnel-flink-15-starter}/pom.xml            |  34 ++-
 .../bin/start-seatunnel-flink-15-connector-v2.sh}  |   2 +-
 .../seatunnel/core/starter/flink/FlinkStarter.java |   4 +-
 .../core/starter/flink/SeaTunnelFlink.java}        |  25 +-
 .../core/starter/flink/args/FlinkCommandArgs.java  |   0
 .../flink/command/FlinkConfValidateCommand.java    |   0
 .../flink/command/FlinkTaskExecuteCommand.java     |   0
 .../FlinkAbstractPluginExecuteProcessor.java       |   2 +-
 .../starter/flink/execution/FlinkExecution.java    |   0
 .../flink/execution/FlinkRuntimeEnvironment.java   |   7 +-
 .../flink/execution/SinkExecuteProcessor.java      |   7 +-
 .../flink/execution/SourceExecuteProcessor.java    |   0
 .../flink/execution/TransformExecuteProcessor.java |   0
 .../core/starter/flink/utils}/ConfigKeyName.java   |   2 +-
 .../core/starter/flink/utils}/EnvironmentUtil.java |   2 +-
 .../core/starter/flink/utils}/TableUtil.java       |   2 +-
 .../src/main/docker/Dockerfile                     |  37 ---
 .../core/starter/flink/util/SchemaUtil.java        | 267 ---------------------
 .../core/starter/flink/FlinkStarterTest.java       |  67 ------
 seatunnel-dist/pom.xml                             |  16 +-
 .../src/main/assembly/assembly-bin-ci.xml          |  10 +-
 seatunnel-dist/src/main/assembly/assembly-bin.xml  |  10 +-
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |  14 +-
 .../e2e/common/container/TestContainerId.java      |   2 +
 .../common/container/flink/Flink13Container.java   |   6 +-
 .../common/container/flink/Flink14Container.java   |   6 +-
 ...Flink13Container.java => Flink15Container.java} |  14 +-
 ...Flink13Container.java => Flink16Container.java} |  14 +-
 .../seatunnel/e2e/common/util/ContainerUtil.java   |   4 +-
 .../apache/seatunnel/e2e/flink/FlinkContainer.java |   6 +-
 .../seatunnel-flink-connector-v2-e2e/pom.xml       |   2 +-
 seatunnel-e2e/seatunnel-transforms-v2-e2e/pom.xml  |   8 +-
 .../seatunnel-flink-connector-v2-example/pom.xml   |   3 +-
 .../seatunnel-translation-flink/pom.xml            |  26 +-
 .../{ => seatunnel-translation-flink-13}/pom.xml   |  13 +-
 .../serialization/CommitWrapperSerializer.java     |   7 +
 .../flink/serialization/FlinkRowConverter.java     |   4 +
 .../FlinkSimpleVersionedSerializer.java            |   7 +
 .../serialization/FlinkWriterStateSerializer.java  |   7 +
 .../flink/serialization/KryoTypeInfo.java          |   0
 .../flink/serialization/WrappedRow.java            |   0
 .../translation/flink/sink/CommitWrapper.java      |   5 +
 .../translation/flink/sink/FlinkCommitter.java     |   6 +
 .../flink/sink/FlinkGlobalCommitter.java           |   8 +
 .../translation/flink/sink/FlinkSink.java          |   7 +
 .../translation/flink/sink/FlinkSinkWriter.java    |   8 +
 .../translation/flink/sink/FlinkWriterState.java   |   4 +
 .../flink/source/BaseSeaTunnelSourceFunction.java  |   3 +
 .../translation/flink/source/RowCollector.java     |   0
 .../flink/source/SeaTunnelCoordinatedSource.java   |   3 +
 .../flink/source/SeaTunnelParallelSource.java      |   3 +
 .../flink/utils/TypeConverterUtils.java            |   1 +
 .../flink/utils/TypeConverterUtilsTest.java        |   0
 .../{ => seatunnel-translation-flink-15}/pom.xml   |  28 ++-
 .../serialization/CommitWrapperSerializer.java     |   7 +
 .../flink/serialization/FlinkRowConverter.java     |   4 +
 .../FlinkSimpleVersionedSerializer.java            |   7 +
 .../serialization/FlinkWriterStateSerializer.java  |   7 +
 .../flink/serialization/KryoTypeInfo.java          |   0
 .../flink/serialization/WrappedRow.java            |   0
 .../translation/flink/sink/CommitWrapper.java      |   5 +
 .../translation/flink/sink/FlinkCommitter.java     |   6 +
 .../flink/sink/FlinkGlobalCommitter.java           |   8 +
 .../translation/flink/sink/FlinkSink.java          |   9 +-
 .../translation/flink/sink/FlinkSinkWriter.java    |  10 +-
 .../translation/flink/sink/FlinkWriterState.java   |   4 +
 .../flink/source/BaseSeaTunnelSourceFunction.java  |   3 +
 .../translation/flink/source/RowCollector.java     |   0
 .../flink/source/SeaTunnelCoordinatedSource.java   |   3 +
 .../flink/source/SeaTunnelParallelSource.java      |   3 +
 .../flink/utils/TypeConverterUtils.java            |   2 +
 tools/dependencies/known-dependencies.txt          |  30 +--
 91 files changed, 358 insertions(+), 691 deletions(-)

diff --git a/pom.xml b/pom.xml
index 3ce902179..8d3f449d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,7 @@
         <commons-logging.version>1.2</commons-logging.version>
         <iceberg.version>0.13.1</iceberg.version>
         <flink.1.13.6.version>1.13.6</flink.1.13.6.version>
+        <flink.1.15.3.version>1.15.3</flink.1.15.3.version>
         <spark.2.4.0.version>2.4.0</spark.2.4.0.version>
         <spark.binary.2.4.version>2.4</spark.binary.2.4.version>
         <commons.beanutils.version>1.9.4</commons.beanutils.version>
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/EngineType.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/EngineType.java
index 3e4fae550..0a3fbc240 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/EngineType.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/enums/EngineType.java
@@ -22,7 +22,8 @@ package org.apache.seatunnel.core.starter.enums;
  */
 public enum EngineType {
     SPARK("spark", "seatunnel-spark-starter.jar", "start-seatunnel-spark-connector-v2.sh"),
-    FLINK("flink", "seatunnel-flink-starter.jar", "start-seatunnel-flink-connector-v2.sh"),
+    FLINK13("flink", "seatunnel-flink-13-starter.jar", "start-seatunnel-flink-13-connector-v2.sh"),
+    FLINK15("flink", "seatunnel-flink-15-starter.jar", "start-seatunnel-flink-15-connector-v2.sh"),
     SEATUNNEL("seatunnel", "seatunnel-starter.jar", "seatunnel.sh");
 
     private final String engine;
diff --git a/seatunnel-core/seatunnel-flink-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/pom.xml
index cc135bbae..a1ca2f5d2 100644
--- a/seatunnel-core/seatunnel-flink-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/pom.xml
@@ -30,10 +30,14 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>seatunnel-flink-starter</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>seatunnel-flink-13-starter</module>
+        <module>seatunnel-flink-15-starter</module>
+    </modules>
 
     <properties>
         <docker.repo>seatunnel-flink</docker.repo>
-        <avro.version>1.8.2</avro.version>
     </properties>
 
     <dependencies>
@@ -44,87 +48,6 @@
             <version>${project.version}</version>
         </dependency>
 
-        <!-- flink-translation -->
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-translation-flink</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <!-- flink 1.13.6 java api -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${flink.1.13.6.version}</version>
-            <scope>${flink.scope}</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-            <version>${flink.1.13.6.version}</version>
-            <scope>${flink.scope}</scope>
-        </dependency>
-
-        <!-- flink planner api -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-            <version>${flink.1.13.6.version}</version>
-            <scope>${flink.scope}</scope>
-        </dependency>
-
-        <!-- flink statebackend-rocksdb api -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
-            <version>${flink.1.13.6.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <!-- flink format api -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-csv</artifactId>
-            <version>${flink.1.13.6.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-orc_${scala.binary.version}</artifactId>
-            <version>${flink.1.13.6.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-parquet_${scala.binary.version}</artifactId>
-            <version>${flink.1.13.6.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-json</artifactId>
-            <version>${flink.1.13.6.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-avro</artifactId>
-            <version>${flink.1.13.6.version}</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>avro</artifactId>
-                    <groupId>org.apache.avro</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro</artifactId>
-            <version>${avro.version}</version>
-        </dependency>
-
     </dependencies>
 
     <build>
@@ -174,80 +97,4 @@
             </plugin>
         </plugins>
     </build>
-
-    <profiles>
-        <!--todo docker-->
-<!--        <profile>
-            <id>docker</id>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>org.codehaus.mojo</groupId>
-                        <artifactId>exec-maven-plugin</artifactId>
-                        <version>${exec-maven-plugin.version}</version>
-                        <executions>
-                            <execution>
-                                <id>docker-build</id>
-                                <phase>package</phase>
-                                <goals>
-                                    <goal>exec</goal>
-                                </goals>
-                                <configuration>
-                                    <environmentVariables>
-                                        <DOCKER_BUILDKIT>1</DOCKER_BUILDKIT>
-                                    </environmentVariables>
-                                    <executable>docker</executable>
-                                    <workingDirectory>${project.basedir}</workingDirectory>
-                                    <arguments>
-                                        <argument>build</argument>
-                                        <argument>&#45;&#45;no-cache</argument>
-                                        <argument>&#45;&#45;build-arg</argument>
-                                        <argument>SCALA_VERSION=${scala.binary.version}</argument>
-                                        <argument>&#45;&#45;build-arg</argument>
-                                        <argument>FLINK_VERSION=${flink.version}</argument>
-                                        <argument>-t</argument>
-                                        <argument>${docker.hub}/${docker.repo}:${docker.tag}</argument>
-                                        <argument>-t</argument>
-                                        <argument>${docker.hub}/${docker.repo}:latest</argument>
-                                        <argument>.</argument>
-                                        <argument>&#45;&#45;file=src/main/docker/Dockerfile</argument>
-                                    </arguments>
-                                </configuration>
-                            </execution>
-                            <execution>
-                                <id>docker-push</id>
-                                <phase>deploy</phase>
-                                <goals>
-                                    <goal>exec</goal>
-                                </goals>
-                                <configuration>
-                                    <environmentVariables>
-                                        <DOCKER_BUILDKIT>1</DOCKER_BUILDKIT>
-                                    </environmentVariables>
-                                    <executable>docker</executable>
-                                    <workingDirectory>${project.basedir}</workingDirectory>
-                                    <arguments>
-                                        <argument>buildx</argument>
-                                        <argument>build</argument>
-                                        <argument>&#45;&#45;no-cache</argument>
-                                        <argument>&#45;&#45;build-arg</argument>
-                                        <argument>SCALA_VERSION=${scala.binary.version}</argument>
-                                        <argument>&#45;&#45;build-arg</argument>
-                                        <argument>FLINK_VERSION=${flink.version}</argument>
-                                        <argument>&#45;&#45;push</argument>
-                                        <argument>-t</argument>
-                                        <argument>${docker.hub}/${docker.repo}:${docker.tag}</argument>
-                                        <argument>-t</argument>
-                                        <argument>${docker.hub}/${docker.repo}:latest</argument>
-                                        <argument>.</argument>
-                                        <argument>&#45;&#45;file=src/main/docker/Dockerfile</argument>
-                                    </arguments>
-                                </configuration>
-                            </execution>
-                        </executions>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>-->
-    </profiles>
 </project>
diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/pom.xml
similarity index 71%
copy from seatunnel-translation/seatunnel-translation-flink/pom.xml
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/pom.xml
index 3530afaf9..382edb38f 100644
--- a/seatunnel-translation/seatunnel-translation-flink/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/pom.xml
@@ -1,56 +1,74 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
+
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
+
        http://www.apache.org/licenses/LICENSE-2.0
+
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
+
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-translation</artifactId>
+        <artifactId>seatunnel-flink-starter</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-translation-flink</artifactId>
+    <artifactId>seatunnel-flink-13-starter</artifactId>
 
     <dependencies>
+
+        <!-- flink-translation -->
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-translation-base</artifactId>
+            <artifactId>seatunnel-translation-flink-13</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <!-- apache flink table -->
+
+        <!-- flink 1.13.6 java api -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <artifactId>flink-java</artifactId>
             <version>${flink.1.13.6.version}</version>
-            <scope>provided</scope>
+            <scope>${flink.scope}</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
             <version>${flink.1.13.6.version}</version>
-            <scope>provided</scope>
+            <scope>${flink.scope}</scope>
         </dependency>
 
+        <!-- flink planner api -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
+            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.1.13.6.version}</version>
+            <scope>${flink.scope}</scope>
+        </dependency>
+
+        <!-- flink state backend rocksdb api -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
             <version>${flink.1.13.6.version}</version>
             <scope>provided</scope>
         </dependency>
 
     </dependencies>
+
 </project>
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/bin/start-seatunnel-flink-13-connector-v2.sh
similarity index 97%
copy from seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/bin/start-seatunnel-flink-13-connector-v2.sh
index c4a4e0283..f2c61f219 100755
--- a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/bin/start-seatunnel-flink-13-connector-v2.sh
@@ -36,7 +36,7 @@ done
 PRG_DIR=`dirname "$PRG"`
 APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
 CONF_DIR=${APP_DIR}/config
-APP_JAR=${APP_DIR}/starter/seatunnel-flink-starter.jar
+APP_JAR=${APP_DIR}/starter/seatunnel-flink-13-starter.jar
 APP_MAIN="org.apache.seatunnel.core.starter.flink.FlinkStarter"
 
 if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
similarity index 95%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 7de456a4a..c08086137 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -32,8 +32,8 @@ import java.util.Objects;
  */
 public class FlinkStarter implements Starter {
     private static final String APP_NAME = SeaTunnelFlink.class.getName();
-    public static final String APP_JAR_NAME = EngineType.FLINK.getStarterJarName();
-    public static final String SHELL_NAME = EngineType.FLINK.getStarterShellName();
+    public static final String APP_JAR_NAME = EngineType.FLINK13.getStarterJarName();
+    public static final String SHELL_NAME = EngineType.FLINK13.getStarterShellName();
     private final FlinkCommandArgs flinkCommandArgs;
     private final String appJar;
 
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
similarity index 95%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
index 64a1b1c25..cb43cf87a 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
 public class SeaTunnelFlink {
     public static void main(String[] args) throws CommandException {
         FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(),
-                EngineType.FLINK.getStarterShellName(), true);
+                EngineType.FLINK13.getStarterShellName(), true);
         SeaTunnel.run(flinkCommandArgs.buildCommand());
     }
 }
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
similarity index 100%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
similarity index 100%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
similarity index 100%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
similarity index 98%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index acf732d81..1850480c4 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
-import org.apache.seatunnel.core.starter.flink.util.TableUtil;
+import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
similarity index 100%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
similarity index 98%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 8f94c2bd8..f04134ae7 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -22,9 +22,9 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
-import org.apache.seatunnel.core.starter.flink.util.ConfigKeyName;
-import org.apache.seatunnel.core.starter.flink.util.EnvironmentUtil;
-import org.apache.seatunnel.core.starter.flink.util.TableUtil;
+import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
+import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
+import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
similarity index 99%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index a6e5cccfd..6ca5a541c 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -38,13 +38,12 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.types.Row;
 
+import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import scala.Serializable;
-
 public class SinkExecuteProcessor extends FlinkAbstractPluginExecuteProcessor
         <SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> {
 
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
similarity index 100%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
similarity index 100%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/ConfigKeyName.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
similarity index 98%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/ConfigKeyName.java
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
index ed66e19b8..6804e604c 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/ConfigKeyName.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.flink.util;
+package org.apache.seatunnel.core.starter.flink.utils;
 
 public class ConfigKeyName {
 
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/EnvironmentUtil.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
similarity index 98%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/EnvironmentUtil.java
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
index 6ab5647a5..22fbe6401 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/EnvironmentUtil.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.flink.util;
+package org.apache.seatunnel.core.starter.flink.utils;
 
 import org.apache.seatunnel.common.config.CheckResult;
 
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/TableUtil.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
similarity index 97%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/TableUtil.java
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
index 8335d2d2a..8e4f1cde7 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/TableUtil.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.flink.util;
+package org.apache.seatunnel.core.starter.flink.utils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml
similarity index 71%
copy from seatunnel-translation/seatunnel-translation-flink/pom.xml
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml
index 3530afaf9..c86c0f54f 100644
--- a/seatunnel-translation/seatunnel-translation-flink/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml
@@ -1,56 +1,70 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
+
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
+
        http://www.apache.org/licenses/LICENSE-2.0
+
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
+
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-translation</artifactId>
+        <artifactId>seatunnel-flink-starter</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-translation-flink</artifactId>
+    <artifactId>seatunnel-flink-15-starter</artifactId>
 
     <dependencies>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-translation-base</artifactId>
+            <artifactId>seatunnel-translation-flink-15</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <!-- apache flink table -->
+
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-            <version>${flink.1.13.6.version}</version>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.1.15.3.version}</version>
             <scope>provided</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
-            <version>${flink.1.13.6.version}</version>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.1.15.3.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${flink.1.13.6.version}</version>
+            <artifactId>flink-table-api-java-bridge</artifactId>
+            <version>${flink.1.15.3.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-statebackend-rocksdb</artifactId>
+            <version>${flink.1.15.3.version}</version>
             <scope>provided</scope>
         </dependency>
 
     </dependencies>
+
 </project>
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/bin/start-seatunnel-flink-15-connector-v2.sh
similarity index 97%
rename from seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/bin/start-seatunnel-flink-15-connector-v2.sh
index c4a4e0283..137b8c043 100755
--- a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/bin/start-seatunnel-flink-15-connector-v2.sh
@@ -36,7 +36,7 @@ done
 PRG_DIR=`dirname "$PRG"`
 APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
 CONF_DIR=${APP_DIR}/config
-APP_JAR=${APP_DIR}/starter/seatunnel-flink-starter.jar
+APP_JAR=${APP_DIR}/starter/seatunnel-flink-15-starter.jar
 APP_MAIN="org.apache.seatunnel.core.starter.flink.FlinkStarter"
 
 if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
similarity index 95%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 7de456a4a..9cd7ab8fa 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -32,8 +32,8 @@ import java.util.Objects;
  */
 public class FlinkStarter implements Starter {
     private static final String APP_NAME = SeaTunnelFlink.class.getName();
-    public static final String APP_JAR_NAME = EngineType.FLINK.getStarterJarName();
-    public static final String SHELL_NAME = EngineType.FLINK.getStarterShellName();
+    public static final String APP_JAR_NAME = EngineType.FLINK15.getStarterJarName();
+    public static final String SHELL_NAME = EngineType.FLINK15.getStarterShellName();
     private final FlinkCommandArgs flinkCommandArgs;
     private final String appJar;
 
diff --git a/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgsTest.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
similarity index 55%
rename from seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgsTest.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
index 9ea5db7a7..935df6ac9 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgsTest.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
@@ -15,23 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.flink.args;
+package org.apache.seatunnel.core.starter.flink;
 
+import org.apache.seatunnel.core.starter.SeaTunnel;
+import org.apache.seatunnel.core.starter.enums.EngineType;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-
-public class FlinkCommandArgsTest {
-
-    @Test
-    public void testParseFlinkArgs() {
-        String[] args = {"-c", "app.conf", "--check", "-i", "city=shenyang", "-i", "date=20200202"};
-        FlinkCommandArgs flinkArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), "seatunnel-flink", true);
-        Assertions.assertEquals("app.conf", flinkArgs.getConfigFile());
-        Assertions.assertTrue(flinkArgs.isCheckConfig());
-        Assertions.assertEquals(Arrays.asList("city=shenyang", "date=20200202"), flinkArgs.getVariables());
+public class SeaTunnelFlink {
+    public static void main(String[] args) throws CommandException {
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(),
+                EngineType.FLINK15.getStarterShellName(), true);
+        SeaTunnel.run(flinkCommandArgs.buildCommand());
     }
 }
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
similarity index 98%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index acf732d81..1850480c4 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
-import org.apache.seatunnel.core.starter.flink.util.TableUtil;
+import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
similarity index 98%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 8f94c2bd8..ee9bfca96 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -22,9 +22,9 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
-import org.apache.seatunnel.core.starter.flink.util.ConfigKeyName;
-import org.apache.seatunnel.core.starter.flink.util.EnvironmentUtil;
-import org.apache.seatunnel.core.starter.flink.util.TableUtil;
+import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
+import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
+import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -154,7 +154,6 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
     private void createStreamTableEnvironment() {
         EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
                 .inStreamingMode()
-                .useBlinkPlanner()
                 .build();
         tableEnvironment = StreamTableEnvironment.create(getStreamExecutionEnvironment(), environmentSettings);
         TableConfig config = tableEnvironment.getConfig();
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
similarity index 94%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index a6e5cccfd..001f95db7 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -36,15 +36,15 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.transformations.SinkV1Adapter;
 import org.apache.flink.types.Row;
 
+import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import scala.Serializable;
-
 public class SinkExecuteProcessor extends FlinkAbstractPluginExecuteProcessor
         <SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> {
 
@@ -88,7 +88,8 @@ public class SinkExecuteProcessor extends FlinkAbstractPluginExecuteProcessor
                 DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
                 saveModeSink.handleSaveMode(dataSaveMode);
             }
-            DataStreamSink<Row> dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
+            DataStreamSink<Row> dataStreamSink = stream.sinkTo(SinkV1Adapter.wrap(new FlinkSink<>(seaTunnelSink)))
+                    .name(seaTunnelSink.getPluginName());
             if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
                 int parallelism = sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
                 dataStreamSink.setParallelism(parallelism);
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/ConfigKeyName.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
similarity index 98%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/ConfigKeyName.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
index ed66e19b8..6804e604c 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/ConfigKeyName.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.flink.util;
+package org.apache.seatunnel.core.starter.flink.utils;
 
 public class ConfigKeyName {
 
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/EnvironmentUtil.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
similarity index 98%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/EnvironmentUtil.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
index 6ab5647a5..22fbe6401 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/EnvironmentUtil.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.flink.util;
+package org.apache.seatunnel.core.starter.flink.utils;
 
 import org.apache.seatunnel.common.config.CheckResult;
 
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/TableUtil.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
similarity index 97%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/TableUtil.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
index 8335d2d2a..8e4f1cde7 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/TableUtil.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.flink.util;
+package org.apache.seatunnel.core.starter.flink.utils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/docker/Dockerfile b/seatunnel-core/seatunnel-flink-starter/src/main/docker/Dockerfile
deleted file mode 100644
index 427ad6c34..000000000
--- a/seatunnel-core/seatunnel-flink-starter/src/main/docker/Dockerfile
+++ /dev/null
@@ -1,37 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-ARG BASE_IMAGE=adoptopenjdk/openjdk8:jre
-
-FROM $BASE_IMAGE
-
-ARG SCALA_VERSION
-ARG FLINK_VERSION
-
-RUN mkdir -p /flink ; cd /flink ; \
-    tar_file=flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz ; \
-    curl -LsO https://archive.apache.org/dist/flink/flink-${FLINK_VERSION}/$tar_file ; \
-    tar -zxf $tar_file --strip 1 -C . ; \
-    rm $tar_file
-
-ENV FLINK_HOME=/flink
-
-WORKDIR /seatunnel
-
-COPY target/seatunnel-core-flink.jar /seatunnel/starter/
-COPY src/main/bin /seatunnel/bin/
-
-ENTRYPOINT [ "/seatunnel/bin/start-seatunnel-flink-connector-v2.sh" ]
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/SchemaUtil.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/SchemaUtil.java
deleted file mode 100644
index f945f0ffc..000000000
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/util/SchemaUtil.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.util;
-
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.DecimalNode;
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.DoubleNode;
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.FloatNode;
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.IntNode;
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.LongNode;
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.api.scala.typeutils.Types;
-import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
-import org.apache.flink.table.descriptors.Avro;
-import org.apache.flink.table.descriptors.Csv;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.FormatDescriptor;
-import org.apache.flink.table.descriptors.Json;
-import org.apache.flink.table.descriptors.Schema;
-import org.apache.flink.table.utils.TypeStringUtils;
-import org.apache.flink.types.Row;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-public final class SchemaUtil {
-
-    private static final Pattern DASH_COMPILE = Pattern.compile("-");
-
-    private SchemaUtil() {
-    }
-
-    public enum FormatType {
-
-        JSON("json"),
-        CSV("csv"),
-        ORC("orc"),
-        AVRO("avro"),
-        PARQUET("parquet"),
-        TEXT("text"),
-        ;
-
-        private final String name;
-
-        private static final Map<String, FormatType> NAME_MAP = Arrays.stream(FormatType.values())
-                .collect(Collectors.toMap(FormatType::getName, Function.identity()));
-
-        FormatType(String name) {
-            this.name = name;
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public static FormatType from(String name) {
-            return NAME_MAP.get(name);
-        }
-    }
-
-    public static void setSchema(Schema schema, Object info, FormatType format) {
-
-        switch (format) {
-            case JSON:
-                getJsonSchema(schema, (ObjectNode) info);
-                break;
-            case CSV:
-                getCsvSchema(schema, (ArrayNode) info);
-                break;
-            case ORC:
-                getOrcSchema(schema, (ObjectNode) info);
-                break;
-            case AVRO:
-                getAvroSchema(schema, (ObjectNode) info);
-                break;
-            case PARQUET:
-                getParquetSchema(schema, (ObjectNode) info);
-                break;
-            default:
-        }
-    }
-
-    public static FormatDescriptor setFormat(FormatType format, Config config) throws Exception {
-        FormatDescriptor formatDescriptor = null;
-        switch (format) {
-            case JSON:
-                formatDescriptor = new Json().failOnMissingField(false).deriveSchema();
-                break;
-            case CSV:
-                Csv csv = new Csv().deriveSchema();
-                Field interPro = csv.getClass().getDeclaredField("internalProperties");
-                interPro.setAccessible(true);
-                Object desc = interPro.get(csv);
-                Class<DescriptorProperties> descCls = DescriptorProperties.class;
-                Method putMethod = descCls.getDeclaredMethod("put", String.class, String.class);
-                putMethod.setAccessible(true);
-                for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
-                    String key = entry.getKey();
-                    if (key.startsWith("format.") && !StringUtils.equals(key, "format.type")) {
-                        String value = config.getString(key);
-                        putMethod.invoke(desc, key, value);
-                    }
-                }
-                formatDescriptor = csv;
-                break;
-            case AVRO:
-                formatDescriptor = new Avro().avroSchema(config.getString("schema"));
-                break;
-            case ORC:
-            case PARQUET:
-            default:
-                break;
-        }
-        return formatDescriptor;
-    }
-
-    private static void getJsonSchema(Schema schema, ObjectNode json) {
-        Iterator<Map.Entry<String, JsonNode>> nodeIterator = json.fields();
-        while (nodeIterator.hasNext()) {
-            Map.Entry<String, JsonNode> entry = nodeIterator.next();
-            String key = entry.getKey();
-            Object value = entry.getValue();
-            if (value instanceof TextNode) {
-                schema.field(key, Types.STRING());
-            } else if (value instanceof IntNode) {
-                schema.field(key, Types.INT());
-            } else if (value instanceof LongNode) {
-                schema.field(key, Types.LONG());
-            } else if (value instanceof DecimalNode) {
-                schema.field(key, Types.JAVA_BIG_DEC());
-            } else if (value instanceof FloatNode) {
-                schema.field(key,  Types.FLOAT());
-            } else if (value instanceof DoubleNode) {
-                schema.field(key, Types.DOUBLE());
-            } else if (value instanceof ObjectNode) {
-                schema.field(key, getTypeInformation((ObjectNode) value));
-            } else if (value instanceof ArrayNode) {
-                Object obj = ((ArrayNode) value).get(0);
-                if (obj instanceof ObjectNode) {
-                    schema.field(key, ObjectArrayTypeInfo.getInfoFor(Row[].class, getTypeInformation((ObjectNode) obj)));
-                } else {
-                    schema.field(key, ObjectArrayTypeInfo.getInfoFor(Object[].class, TypeInformation.of(Object.class)));
-                }
-            }
-        }
-    }
-
-    private static void getCsvSchema(Schema schema, ArrayNode schemaList) {
-        Iterator<JsonNode> iterator = schemaList.elements();
-
-        while (iterator.hasNext()) {
-            JsonNode jsonNode = iterator.next();
-            String field = jsonNode.get("field").textValue();
-            String type = jsonNode.get("type").textValue().toUpperCase();
-
-            schema.field(field, type);
-        }
-    }
-
-    public static TypeInformation<?>[] getCsvType(List<Map<String, String>> schemaList) {
-        TypeInformation<?>[] typeInformation = new TypeInformation[schemaList.size()];
-        int i = 0;
-        for (Map<String, String> map : schemaList) {
-            String type = map.get("type").toUpperCase();
-            typeInformation[i++] = TypeStringUtils.readTypeInfo(type);
-        }
-        return typeInformation;
-    }
-
-    /**
-     * todo
-     *
-     * @param schema schema
-     * @param json   json
-     */
-    private static void getOrcSchema(Schema schema, ObjectNode json) {
-
-    }
-
-    /**
-     * todo
-     *
-     * @param schema schema
-     * @param json   json
-     */
-    private static void getParquetSchema(Schema schema, ObjectNode json) {
-
-    }
-
-    private static void getAvroSchema(Schema schema, ObjectNode json) {
-        RowTypeInfo typeInfo = (RowTypeInfo) AvroSchemaConverter.<Row>convertToTypeInfo(json.toString());
-        String[] fieldNames = typeInfo.getFieldNames();
-        for (String name : fieldNames) {
-            schema.field(name, typeInfo.getTypeAt(name));
-        }
-    }
-
-    public static RowTypeInfo getTypeInformation(ObjectNode json) {
-        int size = json.size();
-        String[] fields = new String[size];
-        TypeInformation<?>[] informations = new TypeInformation[size];
-        int i = 0;
-        Iterator<Map.Entry<String, JsonNode>> nodeIterator = json.fields();
-        while (nodeIterator.hasNext()) {
-            Map.Entry<String, JsonNode> entry = nodeIterator.next();
-            String key = entry.getKey();
-            Object value = entry.getValue();
-            fields[i] = key;
-            if (value instanceof TextNode) {
-                informations[i] = Types.STRING();
-            } else if (value instanceof IntNode) {
-                informations[i] = Types.INT();
-            } else if (value instanceof LongNode) {
-                informations[i] = Types.LONG();
-            } else if (value instanceof DecimalNode) {
-                informations[i] = Types.JAVA_BIG_DEC();
-            } else if (value instanceof FloatNode) {
-                informations[i] = Types.FLOAT();
-            } else if (value instanceof DoubleNode) {
-                informations[i] = Types.DOUBLE();
-            } else if (value instanceof ObjectNode) {
-                informations[i] = getTypeInformation((ObjectNode) value);
-            } else if (value instanceof ArrayNode) {
-                ObjectNode demo = (ObjectNode) ((ArrayNode) value).get(0);
-                informations[i] = ObjectArrayTypeInfo.getInfoFor(Row[].class, getTypeInformation(demo));
-            }
-            i++;
-        }
-        return new RowTypeInfo(informations, fields);
-    }
-
-    public static String getUniqueTableName() {
-        return DASH_COMPILE.matcher(UUID.randomUUID().toString()).replaceAll("_");
-    }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkStarterTest.java b/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkStarterTest.java
deleted file mode 100644
index 01b47f9a4..000000000
--- a/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkStarterTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class FlinkStarterTest {
-
-    @Test
-    public void testConfigOption() {
-        String[] args = {"--config", "test.conf"};
-        FlinkStarter flinkStarter = new FlinkStarter(args);
-        String flinkExecuteCommand = String.join(" ", flinkStarter.buildCommands());
-        Assertions.assertTrue(flinkExecuteCommand.contains("--config test.conf"));
-    }
-
-    @Test
-    public void testSubmitMasterOption() {
-        String[] args = {"--config", "test.conf", "-n", "test-flink-job", "--master", "yarn-session"};
-        FlinkStarter flinkStarter = new FlinkStarter(args);
-        String flinkExecuteCommand = String.join(" ", flinkStarter.buildCommands());
-        Assertions.assertTrue(flinkExecuteCommand.contains("--target yarn-session"));
-    }
-
-    @Test
-    public void testDeployModeOption() {
-        String[] args = {"--config", "test.conf", "-n", "test-flink-job", "--master", "yarn-per-job", "-e", "run-application"};
-        FlinkStarter flinkStarter = new FlinkStarter(args);
-        String flinkExecuteCommand = String.join(" ", flinkStarter.buildCommands());
-        Assertions.assertTrue(flinkExecuteCommand.contains("--target yarn-per-job"));
-        Assertions.assertTrue(flinkExecuteCommand.contains("flink run-application"));
-    }
-
-    @Test
-    public void testExtraParametersOption() {
-        String[] args = {"--config", "test.conf", "-n", "test-flink-job", "--master", "yarn-per-job", "-m", "192.168.1.1:8080"};
-        FlinkStarter flinkStarter = new FlinkStarter(args);
-        String flinkExecuteCommand = String.join(" ", flinkStarter.buildCommands());
-        Assertions.assertTrue(flinkExecuteCommand.contains("--target yarn-per-job"));
-        Assertions.assertTrue(flinkExecuteCommand.contains("-m 192.168.1.1:8080"));
-    }
-
-    @Test
-    public void testExtraVariablesOption() {
-        String[] args = {"--config", "test.conf", "-n", "test-flink-job", "-i", "name=tyrantlucifer", "-i", "age=26"};
-        FlinkStarter flinkStarter = new FlinkStarter(args);
-        String flinkExecuteCommand = String.join(" ", flinkStarter.buildCommands());
-        Assertions.assertTrue(flinkExecuteCommand.contains("-Dname=tyrantlucifer"));
-        Assertions.assertTrue(flinkExecuteCommand.contains("-Dage=26"));
-    }
-}
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index e5dc2c2c7..170b85016 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -107,7 +107,13 @@
                 <!-- starters -->
                 <dependency>
                     <groupId>org.apache.seatunnel</groupId>
-                    <artifactId>seatunnel-flink-starter</artifactId>
+                    <artifactId>seatunnel-flink-13-starter</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>seatunnel-flink-15-starter</artifactId>
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
@@ -568,7 +574,13 @@
                 </dependency>
                 <dependency>
                     <groupId>org.apache.seatunnel</groupId>
-                    <artifactId>seatunnel-flink-starter</artifactId>
+                    <artifactId>seatunnel-flink-13-starter</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>seatunnel-flink-15-starter</artifactId>
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index 3be579543..a51656968 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -50,7 +50,12 @@
         </fileSet>
         <!-- ============ Starter Bin ============  -->
         <fileSet>
-            <directory>../seatunnel-core/seatunnel-flink-starter/src/main/bin</directory>
+            <directory>../seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/bin</directory>
+            <outputDirectory>/bin</outputDirectory>
+            <fileMode>0755</fileMode>
+        </fileSet>
+        <fileSet>
+            <directory>../seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/bin</directory>
             <outputDirectory>/bin</outputDirectory>
             <fileMode>0755</fileMode>
         </fileSet>
@@ -117,7 +122,8 @@
             <unpack>false</unpack>
             <includes>
                 <!-- Flink V2 starter -->
-                <include>org.apache.seatunnel:seatunnel-flink-starter:jar</include>
+                <include>org.apache.seatunnel:seatunnel-flink-13-starter:jar</include>
+                <include>org.apache.seatunnel:seatunnel-flink-15-starter:jar</include>
                 <!-- Spark V2 starter -->
                 <include>org.apache.seatunnel:seatunnel-spark-starter:jar</include>
                 <!-- SeaTunnel Engine starter -->
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index 7d724bc5a..66d3d086b 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -52,7 +52,12 @@
         <!-- ============ Starter Bin ============  -->
         <!--connector starter v2-->
         <fileSet>
-            <directory>../seatunnel-core/seatunnel-flink-starter/src/main/bin</directory>
+            <directory>../seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/bin</directory>
+            <outputDirectory>/bin</outputDirectory>
+            <fileMode>0755</fileMode>
+        </fileSet>
+        <fileSet>
+            <directory>../seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/bin</directory>
             <outputDirectory>/bin</outputDirectory>
             <fileMode>0755</fileMode>
         </fileSet>
@@ -138,7 +143,8 @@
             <unpack>false</unpack>
             <includes>
                 <!-- Flink V2 starter -->
-                <include>org.apache.seatunnel:seatunnel-flink-starter:jar</include>
+                <include>org.apache.seatunnel:seatunnel-flink-13-starter:jar</include>
+                <include>org.apache.seatunnel:seatunnel-flink-15-starter:jar</include>
                 <!-- Spark V2 starter -->
                 <include>org.apache.seatunnel:seatunnel-spark-starter:jar</include>
                 <!-- SeaTunnel Engine starter -->
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 2cc1672d1..9466f5832 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -63,7 +63,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-flink-starter</artifactId>
+            <artifactId>seatunnel-flink-13-starter</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-flink-15-starter</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
@@ -73,6 +79,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-starter</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainerId.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainerId.java
index 945c1f1d3..0d7db68b7 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainerId.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainerId.java
@@ -28,6 +28,8 @@ import lombok.Getter;
 public enum TestContainerId {
     FLINK_1_13(FLINK, "1.13.6"),
     FLINK_1_14(FLINK, "1.14.6"),
+    FLINK_1_15(FLINK, "1.15.3"),
+    FLINK_1_16(FLINK, "1.16.0"),
     SPARK_2_4(SPARK, "2.4.6"),
     SEATUNNEL(EngineType.SEATUNNEL, "2.2.0");
 
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink13Container.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink13Container.java
index 0a7af6454..f4a0743f3 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink13Container.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink13Container.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import com.google.auto.service.AutoService;
 import lombok.NoArgsConstructor;
 
+import java.io.File;
+
 /**
  * This class is the base class of FlinkEnvironment test for new seatunnel connector API.
  * The before method will create a Flink cluster, and after method will close the Flink cluster.
@@ -44,12 +46,12 @@ public class Flink13Container extends AbstractTestFlinkContainer {
 
     @Override
     protected String getStartModuleName() {
-        return "seatunnel-flink-starter";
+        return "seatunnel-flink-starter" + File.separator + "seatunnel-flink-13-starter";
     }
 
     @Override
     protected String getStartShellName() {
-        return "start-seatunnel-flink-connector-v2.sh";
+        return "start-seatunnel-flink-13-connector-v2.sh";
     }
 
     @Override
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink14Container.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink14Container.java
index b76296022..32a301e5a 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink14Container.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink14Container.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import com.google.auto.service.AutoService;
 import lombok.NoArgsConstructor;
 
+import java.io.File;
+
 /**
  * This class is the base class of FlinkEnvironment test for new seatunnel connector API.
  * The before method will create a Flink cluster, and after method will close the Flink cluster.
@@ -44,12 +46,12 @@ public class Flink14Container extends AbstractTestFlinkContainer {
 
     @Override
     protected String getStartModuleName() {
-        return "seatunnel-flink-starter";
+        return "seatunnel-flink-starter" + File.separator + "seatunnel-flink-13-starter";
     }
 
     @Override
     protected String getStartShellName() {
-        return "start-seatunnel-flink-connector-v2.sh";
+        return "start-seatunnel-flink-13-connector-v2.sh";
     }
 
     @Override
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink13Container.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink15Container.java
similarity index 82%
copy from seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink13Container.java
copy to seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink15Container.java
index 0a7af6454..17e41aacf 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink13Container.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink15Container.java
@@ -23,33 +23,35 @@ import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import com.google.auto.service.AutoService;
 import lombok.NoArgsConstructor;
 
+import java.io.File;
+
 /**
  * This class is the base class of FlinkEnvironment test for new seatunnel connector API.
  * The before method will create a Flink cluster, and after method will close the Flink cluster.
- * You can use {@link Flink13Container#executeJob} to submit a seatunnel config and run a seatunnel job.
+ * You can use {@link Flink15Container#executeJob} to submit a seatunnel config and run a seatunnel job.
  */
 @NoArgsConstructor
 @AutoService(TestContainer.class)
-public class Flink13Container extends AbstractTestFlinkContainer {
+public class Flink15Container extends AbstractTestFlinkContainer {
 
     @Override
     public TestContainerId identifier() {
-        return TestContainerId.FLINK_1_13;
+        return TestContainerId.FLINK_1_15;
     }
 
     @Override
     protected String getDockerImage() {
-        return "tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27";
+        return "tyrantlucifer/flink:1.15.3-scala_2.12_hadoop27";
     }
 
     @Override
     protected String getStartModuleName() {
-        return "seatunnel-flink-starter";
+        return "seatunnel-flink-starter" + File.separator + "seatunnel-flink-15-starter";
     }
 
     @Override
     protected String getStartShellName() {
-        return "start-seatunnel-flink-connector-v2.sh";
+        return "start-seatunnel-flink-15-connector-v2.sh";
     }
 
     @Override
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink13Container.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink16Container.java
similarity index 82%
copy from seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink13Container.java
copy to seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink16Container.java
index 0a7af6454..f514949fb 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink13Container.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/Flink16Container.java
@@ -23,33 +23,35 @@ import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import com.google.auto.service.AutoService;
 import lombok.NoArgsConstructor;
 
+import java.io.File;
+
 /**
  * This class is the base class of FlinkEnvironment test for new seatunnel connector API.
  * The before method will create a Flink cluster, and after method will close the Flink cluster.
- * You can use {@link Flink13Container#executeJob} to submit a seatunnel config and run a seatunnel job.
+ * You can use {@link Flink16Container#executeJob} to submit a seatunnel config and run a seatunnel job.
  */
 @NoArgsConstructor
 @AutoService(TestContainer.class)
-public class Flink13Container extends AbstractTestFlinkContainer {
+public class Flink16Container extends AbstractTestFlinkContainer {
 
     @Override
     public TestContainerId identifier() {
-        return TestContainerId.FLINK_1_13;
+        return TestContainerId.FLINK_1_16;
     }
 
     @Override
     protected String getDockerImage() {
-        return "tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27";
+        return "tyrantlucifer/flink:1.16.0-scala_2.12_hadoop27";
     }
 
     @Override
     protected String getStartModuleName() {
-        return "seatunnel-flink-starter";
+        return "seatunnel-flink-starter" + File.separator + "seatunnel-flink-15-starter";
     }
 
     @Override
     protected String getStartShellName() {
-        return "start-seatunnel-flink-connector-v2.sh";
+        return "start-seatunnel-flink-15-connector-v2.sh";
     }
 
     @Override
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
index 6c762844c..60f95ce29 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
@@ -106,7 +106,9 @@ public final class ContainerUtil {
                                                        String startModuleName,
                                                        String startModulePath,
                                                        String seatunnelHomeInContainer) {
-        final String startJarName = startModuleName + ".jar";
+        // solve the problem of multi modules such as seatunnel-flink-starter/seatunnel-flink-13-starter
+        final String[] splits = startModuleName.split(File.separator);
+        final String startJarName = splits[splits.length - 1] + ".jar";
         // copy starter
         final String startJarPath = startModulePath + File.separator + "target" + File.separator + startJarName;
         checkPathExist(startJarPath);
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
index 21c96ac8f..9b7993cff 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.e2e.flink;
 import org.apache.seatunnel.e2e.common.AbstractFlinkContainer;
 import org.apache.seatunnel.e2e.common.container.TestContainerId;
 
+import java.io.File;
+
 /**
  * This class is the base class of FlinkEnvironment test for new seatunnel connector API.
  * The before method will create a Flink cluster, and after method will close the Flink cluster.
@@ -39,12 +41,12 @@ public abstract class FlinkContainer extends AbstractFlinkContainer {
 
     @Override
     protected String getStartModuleName() {
-        return "seatunnel-flink-starter";
+        return "seatunnel-flink-starter" + File.separator + "seatunnel-flink-13-starter";
     }
 
     @Override
     protected String getStartShellName() {
-        return "start-seatunnel-flink-connector-v2.sh";
+        return "start-seatunnel-flink-13-connector-v2.sh";
     }
 
     @Override
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 78ac1bc32..722ab02c7 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -34,7 +34,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-flink-starter</artifactId>
+            <artifactId>seatunnel-flink-13-starter</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-transforms-v2-e2e/pom.xml
index b86a80fbe..fc4b69fdf 100644
--- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/pom.xml
@@ -35,7 +35,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-flink-starter</artifactId>
+            <artifactId>seatunnel-flink-13-starter</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-flink-15-starter</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index 98219890a..e0643d9bb 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -40,9 +40,10 @@
             <groupId>com.typesafe</groupId>
             <artifactId>config</artifactId>
         </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-flink-starter</artifactId>
+            <artifactId>seatunnel-flink-13-starter</artifactId>
             <version>${project.version}</version>
         </dependency>
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-translation/seatunnel-translation-flink/pom.xml
index 3530afaf9..ae4821fb4 100644
--- a/seatunnel-translation/seatunnel-translation-flink/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/pom.xml
@@ -24,33 +24,19 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>seatunnel-translation-flink</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>seatunnel-translation-flink-13</module>
+        <module>seatunnel-translation-flink-15</module>
+    </modules>
 
     <dependencies>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-translation-base</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <!-- apache flink table -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-            <version>${flink.1.13.6.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
-            <version>${flink.1.13.6.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${flink.1.13.6.version}</version>
-            <scope>provided</scope>
-        </dependency>
 
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/pom.xml
similarity index 84%
copy from seatunnel-translation/seatunnel-translation-flink/pom.xml
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/pom.xml
index 3530afaf9..cb5cd36da 100644
--- a/seatunnel-translation/seatunnel-translation-flink/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/pom.xml
@@ -17,27 +17,23 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-translation</artifactId>
+        <artifactId>seatunnel-translation-flink</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-translation-flink</artifactId>
+    <artifactId>seatunnel-translation-flink-13</artifactId>
 
     <dependencies>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-translation-base</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <!-- apache flink table -->
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
             <version>${flink.1.13.6.version}</version>
             <scope>provided</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
@@ -53,4 +49,5 @@
         </dependency>
 
     </dependencies>
+
 </project>
\ No newline at end of file
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
similarity index 89%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
index 65500b719..0618adcb0 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.translation.flink.serialization;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.translation.flink.sink.CommitWrapper;
 
+import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.ByteArrayInputStream;
@@ -28,6 +29,12 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+/**
+ * The serializer wrapper of the commit message serializer,
+ * which is created by {@link Sink#getCommittableSerializer()},
+ * used to unify the different implementations of {@link Serializer}
+ * @param <T> The generic type of commit message
+ */
 public class CommitWrapperSerializer<T> implements SimpleVersionedSerializer<CommitWrapper<T>> {
     private final Serializer<T> serializer;
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
similarity index 97%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
index 0eba14b52..1949d2cef 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
@@ -32,6 +32,10 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.function.BiFunction;
 
+/**
+ * The row converter between {@link Row} and {@link SeaTunnelRow},
+ * used to convert or reconvert between flink row and seatunnel row
+ */
 public class FlinkRowConverter extends RowConverter<Row> {
 
     public FlinkRowConverter(SeaTunnelDataType<?> dataType) {
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
similarity index 83%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
index 98042c596..b5e072b93 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
@@ -19,10 +19,17 @@ package org.apache.seatunnel.translation.flink.serialization;
 
 import org.apache.seatunnel.api.serialization.Serializer;
 
+import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
 
+/**
+ * The serializer wrapper of aggregate commit message serializer,
+ * which is created by {@link Sink#getGlobalCommittableSerializer()},
+ * used to unify the different implementations of {@link Serializer}
+ * @param <T> The generic type of aggregate commit message
+ */
 public class FlinkSimpleVersionedSerializer<T> implements SimpleVersionedSerializer<T> {
 
     private final Serializer<T> serializer;
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
similarity index 90%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
index 5790cb85b..703f27976 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.translation.flink.serialization;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.translation.flink.sink.FlinkWriterState;
 
+import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.ByteArrayInputStream;
@@ -28,6 +29,12 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+/**
+ * The serializer wrapper of writer state serializer,
+ * which is created by {@link Sink#getWriterStateSerializer()},
+ * used to unify the different implementations of {@link Serializer}
+ * @param <T> The generic type of writer state
+ */
 public class FlinkWriterStateSerializer<T> implements SimpleVersionedSerializer<FlinkWriterState<T>> {
     private final Serializer<T> serializer;
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
similarity index 100%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
similarity index 100%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
similarity index 83%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
index 1b91085af..574da5d6c 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
@@ -17,6 +17,11 @@
 
 package org.apache.seatunnel.translation.flink.sink;
 
+/**
+ * The commit message wrapper, which is used to wrapper the different commit messages
+ * and unify the different implementations of {@link CommitT}
+ * @param <CommitT> The generic type of commit message
+ */
 public class CommitWrapper<CommitT> {
     private final CommitT commit;
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
similarity index 87%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
index 2f47df964..e1ba5bd17 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
@@ -21,12 +21,18 @@ import org.apache.seatunnel.api.sink.SinkCommitter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.Sink;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
+/**
+ * The committer wrapper of {@link SinkCommitter}, which is created by {@link Sink#createCommitter()},
+ * used to unify the different sink committer implementations
+ * @param <CommT> The generic type of commit message
+ */
 @Slf4j
 public class FlinkCommitter<CommT> implements Committer<CommitWrapper<CommT>> {
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
similarity index 86%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
index d9a060d7a..26bdfadd6 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -28,6 +29,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+/**
+ * The committer wrapper of {@link SinkAggregatedCommitter},
+ * which is created by {@link Sink#createGlobalCommitter()},
+ * used to unify the different implementations of {@link SinkAggregatedCommitter}
+ * @param <CommT> The generic type of commit message type
+ * @param <GlobalCommT> The generic type of global commit message type
+ */
 @Slf4j
 public class FlinkGlobalCommitter<CommT, GlobalCommT> implements GlobalCommitter<CommitWrapper<CommT>, GlobalCommT> {
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
similarity index 92%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index a3e927240..71245c995 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -35,6 +35,13 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+/**
+ * The sink implementation of {@link Sink}, the entrypoint of flink sink translation
+ * @param <InputT> The generic type of input data
+ * @param <CommT> The generic type of commit message
+ * @param <WriterStateT> The generic type of writer state
+ * @param <GlobalCommT> The generic type of global commit message
+ */
 public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT, CommitWrapper<CommT>,
         FlinkWriterState<WriterStateT>, GlobalCommT> {
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
similarity index 90%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index beb5976a0..734cb1927 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
 
+import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.types.Row;
 
@@ -31,6 +32,13 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+/**
+ * The sink writer implementation of {@link SinkWriter},
+ * which is created by {@link Sink#createWriter}
+ * @param <InputT> The generic type of input data
+ * @param <CommT> The generic type of commit message
+ * @param <WriterStateT> The generic type of writer state
+ */
 public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> {
 
     private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter;
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
similarity index 89%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
index 095fb07d0..7d21e1c29 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
@@ -19,6 +19,10 @@ package org.apache.seatunnel.translation.flink.sink;
 
 import java.io.Serializable;
 
+/**
+ * The writer state wrapper of {@link StateT}, used to unify the different implementations of {@link StateT}
+ * @param <StateT> The generic type of the writer state
+ */
 public class FlinkWriterState<StateT> implements Serializable {
 
     private long checkpointId = 0;
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
similarity index 98%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 0d811b38c..5ec31c2b7 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -46,6 +46,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+/**
+ * The abstract implementation of {@link RichSourceFunction}, the entrypoint of flink source translation
+ */
 public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row>
     implements CheckpointListener, ResultTypeQueryable<Row>, CheckpointedFunction {
     private static final Logger LOG = LoggerFactory.getLogger(BaseSeaTunnelSourceFunction.class);
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
similarity index 100%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
similarity index 94%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
index a6992341e..3dc8d850b 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
@@ -22,6 +22,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.translation.source.BaseSourceFunction;
 import org.apache.seatunnel.translation.source.CoordinatedSource;
 
+/**
+ * The coordinated source function implementation of {@link BaseSeaTunnelSourceFunction}
+ */
 public class SeaTunnelCoordinatedSource extends BaseSeaTunnelSourceFunction {
 
     protected static final String COORDINATED_SOURCE_STATE_NAME = "coordinated-source-states";
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
similarity index 95%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
index c3f8bfeb7..9c72e778c 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
@@ -25,6 +25,9 @@ import org.apache.seatunnel.translation.source.ParallelSource;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.types.Row;
 
+/**
+ * The parallel source function implementation of {@link BaseSeaTunnelSourceFunction}
+ */
 public class SeaTunnelParallelSource extends BaseSeaTunnelSourceFunction implements ParallelSourceFunction<Row> {
 
     protected static final String PARALLEL_SOURCE_STATE_NAME = "parallel-source-states";
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
similarity index 98%
copy from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
index de452c7de..f19ee79b5 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
@@ -59,6 +59,7 @@ public class TypeConverterUtils {
         BRIDGED_TYPES.put(Float.class, BridgedType.of(BasicType.FLOAT_TYPE, BasicTypeInfo.FLOAT_TYPE_INFO));
         BRIDGED_TYPES.put(Double.class, BridgedType.of(BasicType.DOUBLE_TYPE, BasicTypeInfo.DOUBLE_TYPE_INFO));
         BRIDGED_TYPES.put(Void.class, BridgedType.of(BasicType.VOID_TYPE, BasicTypeInfo.VOID_TYPE_INFO));
+        // TODO: there is a still an unresolved issue that the BigDecimal type will lose the precision and scale
         BRIDGED_TYPES.put(BigDecimal.class, BridgedType.of(new DecimalType(38, 18), BasicTypeInfo.BIG_DEC_TYPE_INFO));
         // data time types
         BRIDGED_TYPES.put(LocalDate.class, BridgedType.of(LocalTimeType.LOCAL_DATE_TYPE, LocalTimeTypeInfo.LOCAL_DATE));
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml
similarity index 72%
copy from seatunnel-translation/seatunnel-translation-flink/pom.xml
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml
index 3530afaf9..b1314b51d 100644
--- a/seatunnel-translation/seatunnel-translation-flink/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml
@@ -17,40 +17,44 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-translation</artifactId>
+        <artifactId>seatunnel-translation-flink</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-translation-flink</artifactId>
+    <artifactId>seatunnel-translation-flink-15</artifactId>
 
     <dependencies>
+
         <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-translation-base</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.1.15.3.version}</version>
+            <scope>provided</scope>
         </dependency>
-        <!-- apache flink table -->
+
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-            <version>${flink.1.13.6.version}</version>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.1.15.3.version}</version>
             <scope>provided</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
-            <version>${flink.1.13.6.version}</version>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.1.15.3.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${flink.1.13.6.version}</version>
+            <artifactId>flink-table-runtime</artifactId>
+            <version>${flink.1.15.3.version}</version>
             <scope>provided</scope>
         </dependency>
 
     </dependencies>
+
 </project>
\ No newline at end of file
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
similarity index 89%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
index 65500b719..0618adcb0 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.translation.flink.serialization;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.translation.flink.sink.CommitWrapper;
 
+import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.ByteArrayInputStream;
@@ -28,6 +29,12 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+/**
+ * The serializer wrapper of the commit message serializer,
+ * which is created by {@link Sink#getCommittableSerializer()},
+ * used to unify the different implementations of {@link Serializer}
+ * @param <T> The generic type of commit message
+ */
 public class CommitWrapperSerializer<T> implements SimpleVersionedSerializer<CommitWrapper<T>> {
     private final Serializer<T> serializer;
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
similarity index 97%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
index 0eba14b52..1949d2cef 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
@@ -32,6 +32,10 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.function.BiFunction;
 
+/**
+ * The row converter between {@link Row} and {@link SeaTunnelRow},
+ * used to convert or reconvert between flink row and seatunnel row
+ */
 public class FlinkRowConverter extends RowConverter<Row> {
 
     public FlinkRowConverter(SeaTunnelDataType<?> dataType) {
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
similarity index 83%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
index 98042c596..b5e072b93 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
@@ -19,10 +19,17 @@ package org.apache.seatunnel.translation.flink.serialization;
 
 import org.apache.seatunnel.api.serialization.Serializer;
 
+import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
 
+/**
+ * The serializer wrapper of aggregate commit message serializer,
+ * which is created by {@link Sink#getGlobalCommittableSerializer()},
+ * used to unify the different implementations of {@link Serializer}
+ * @param <T> The generic type of aggregate commit message
+ */
 public class FlinkSimpleVersionedSerializer<T> implements SimpleVersionedSerializer<T> {
 
     private final Serializer<T> serializer;
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
similarity index 90%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
index 5790cb85b..703f27976 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.translation.flink.serialization;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.translation.flink.sink.FlinkWriterState;
 
+import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.ByteArrayInputStream;
@@ -28,6 +29,12 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+/**
+ * The serializer wrapper of writer state serializer,
+ * which is created by {@link Sink#getWriterStateSerializer()},
+ * used to unify the different implementations of {@link Serializer}
+ * @param <T> The generic type of writer state
+ */
 public class FlinkWriterStateSerializer<T> implements SimpleVersionedSerializer<FlinkWriterState<T>> {
     private final Serializer<T> serializer;
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
similarity index 83%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
index 1b91085af..574da5d6c 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
@@ -17,6 +17,11 @@
 
 package org.apache.seatunnel.translation.flink.sink;
 
+/**
+ * The commit message wrapper, which is used to wrapper the different commit messages
+ * and unify the different implementations of {@link CommitT}
+ * @param <CommitT> The generic type of commit message
+ */
 public class CommitWrapper<CommitT> {
     private final CommitT commit;
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
similarity index 87%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
index 2f47df964..e1ba5bd17 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
@@ -21,12 +21,18 @@ import org.apache.seatunnel.api.sink.SinkCommitter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.Sink;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
+/**
+ * The committer wrapper of {@link SinkCommitter}, which is created by {@link Sink#createCommitter()},
+ * used to unify the different sink committer implementations
+ * @param <CommT> The generic type of commit message
+ */
 @Slf4j
 public class FlinkCommitter<CommT> implements Committer<CommitWrapper<CommT>> {
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
similarity index 86%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
index d9a060d7a..26bdfadd6 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -28,6 +29,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+/**
+ * The committer wrapper of {@link SinkAggregatedCommitter},
+ * which is created by {@link Sink#createGlobalCommitter()},
+ * used to unify the different implementations of {@link SinkAggregatedCommitter}
+ * @param <CommT> The generic type of commit message type
+ * @param <GlobalCommT> The generic type of global commit message type
+ */
 @Slf4j
 public class FlinkGlobalCommitter<CommT, GlobalCommT> implements GlobalCommitter<CommitWrapper<CommT>, GlobalCommT> {
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
similarity index 89%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index a3e927240..296a81127 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -35,6 +35,13 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+/**
+ * The sink implementation of {@link Sink}, the entrypoint of flink sink translation
+ * @param <InputT> The generic type of input data
+ * @param <CommT> The generic type of commit message
+ * @param <WriterStateT> The generic type of writer state
+ * @param <GlobalCommT> The generic type of global commit message
+ */
 public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink<InputT, CommitWrapper<CommT>,
         FlinkWriterState<WriterStateT>, GlobalCommT> {
 
@@ -45,7 +52,7 @@ public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT> implements Sink
     }
 
     @Override
-    public SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> createWriter(org.apache.flink.api.connector.sink.Sink.InitContext context, List<FlinkWriterState<WriterStateT>> states) throws IOException {
+    public SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> createWriter(Sink.InitContext context, List<FlinkWriterState<WriterStateT>> states) throws IOException {
         org.apache.seatunnel.api.sink.SinkWriter.Context stContext = new DefaultSinkWriterContext(context.getSubtaskId());
 
         if (states == null || states.isEmpty()) {
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
similarity index 88%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index beb5976a0..904a20d3b 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
 
+import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.types.Row;
 
@@ -31,6 +32,13 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+/**
+ * The sink writer implementation of {@link SinkWriter},
+ * which is created by {@link Sink#createWriter}
+ * @param <InputT> The generic type of input data
+ * @param <CommT> The generic type of commit message
+ * @param <WriterStateT> The generic type of writer state
+ */
 public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> {
 
     private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter;
@@ -46,7 +54,7 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<
     }
 
     @Override
-    public void write(InputT element, org.apache.flink.api.connector.sink.SinkWriter.Context context) throws IOException {
+    public void write(InputT element, SinkWriter.Context context) throws IOException {
         if (element instanceof Row) {
             sinkWriter.write(rowSerialization.reconvert((Row) element));
         } else {
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
similarity index 89%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
index 095fb07d0..7d21e1c29 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
@@ -19,6 +19,10 @@ package org.apache.seatunnel.translation.flink.sink;
 
 import java.io.Serializable;
 
+/**
+ * The writer state wrapper of {@link StateT}, used to unify the different implementations of {@link StateT}
+ * @param <StateT> The generic type of the writer state
+ */
 public class FlinkWriterState<StateT> implements Serializable {
 
     private long checkpointId = 0;
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
similarity index 98%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 0d811b38c..5ec31c2b7 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -46,6 +46,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+/**
+ * The abstract implementation of {@link RichSourceFunction}, the entrypoint of flink source translation
+ */
 public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row>
     implements CheckpointListener, ResultTypeQueryable<Row>, CheckpointedFunction {
     private static final Logger LOG = LoggerFactory.getLogger(BaseSeaTunnelSourceFunction.class);
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
similarity index 94%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
index a6992341e..3dc8d850b 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
@@ -22,6 +22,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.translation.source.BaseSourceFunction;
 import org.apache.seatunnel.translation.source.CoordinatedSource;
 
+/**
+ * The coordinated source function implementation of {@link BaseSeaTunnelSourceFunction}
+ */
 public class SeaTunnelCoordinatedSource extends BaseSeaTunnelSourceFunction {
 
     protected static final String COORDINATED_SOURCE_STATE_NAME = "coordinated-source-states";
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
similarity index 95%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
index c3f8bfeb7..9c72e778c 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
@@ -25,6 +25,9 @@ import org.apache.seatunnel.translation.source.ParallelSource;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.types.Row;
 
+/**
+ * The parallel source function implementation of {@link BaseSeaTunnelSourceFunction}
+ */
 public class SeaTunnelParallelSource extends BaseSeaTunnelSourceFunction implements ParallelSourceFunction<Row> {
 
     protected static final String PARALLEL_SOURCE_STATE_NAME = "parallel-source-states";
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
similarity index 98%
rename from seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
index de452c7de..7eb33428b 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
@@ -59,7 +59,9 @@ public class TypeConverterUtils {
         BRIDGED_TYPES.put(Float.class, BridgedType.of(BasicType.FLOAT_TYPE, BasicTypeInfo.FLOAT_TYPE_INFO));
         BRIDGED_TYPES.put(Double.class, BridgedType.of(BasicType.DOUBLE_TYPE, BasicTypeInfo.DOUBLE_TYPE_INFO));
         BRIDGED_TYPES.put(Void.class, BridgedType.of(BasicType.VOID_TYPE, BasicTypeInfo.VOID_TYPE_INFO));
+        // TODO: there is a still an unresolved issue that the BigDecimal type will lose the precision and scale
         BRIDGED_TYPES.put(BigDecimal.class, BridgedType.of(new DecimalType(38, 18), BasicTypeInfo.BIG_DEC_TYPE_INFO));
+
         // data time types
         BRIDGED_TYPES.put(LocalDate.class, BridgedType.of(LocalTimeType.LOCAL_DATE_TYPE, LocalTimeTypeInfo.LOCAL_DATE));
         BRIDGED_TYPES.put(LocalTime.class, BridgedType.of(LocalTimeType.LOCAL_TIME_TYPE, LocalTimeTypeInfo.LOCAL_TIME));
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 05f6b07ba..a46c3033b 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -1,48 +1,22 @@
-aircompressor-0.10.jar
-audience-annotations-0.11.0.jar
-avro-1.8.2.jar
 commons-codec-1.13.jar
 commons-collections4-4.4.jar
 commons-compress-1.20.jar
 commons-io-2.11.0.jar
-commons-lang-2.6.jar
 commons-lang3-3.4.jar
-commons-pool-1.6.jar
 config-1.3.3.jar
 disruptor-3.4.4.jar
-flink-avro-1.13.6.jar
-flink-csv-1.13.6.jar
-flink-json-1.13.6.jar
-flink-orc_2.11-1.13.6.jar
-flink-parquet_2.11-1.13.6.jar
-force-shading-1.13.6.jar
 guava-19.0.jar
 hazelcast-5.1.jar
-hive-storage-api-2.6.0.jar
 jackson-annotations-2.12.6.jar
 jackson-core-2.12.6.jar
-jackson-core-asl-1.9.13.jar
 jackson-databind-2.12.6.jar
 jackson-dataformat-properties-2.12.6.jar
-jackson-mapper-asl-1.9.13.jar
-javax.annotation-api-1.3.2.jar
 jcl-over-slf4j-1.7.25.jar
 jcommander-1.81.jar
-jsr305-1.3.9.jar
 log4j-api-2.17.1.jar
 log4j-core-2.17.1.jar
 log4j-slf4j-impl-2.17.1.jar
 log4j-1.2-api-2.17.1.jar
-orc-core-1.5.6.jar
-orc-shims-1.5.6.jar
-paranamer-2.7.jar
-parquet-column-1.11.1.jar
-parquet-common-1.11.1.jar
-parquet-encoding-1.11.1.jar
-parquet-format-structures-1.11.1.jar
-parquet-hadoop-1.11.1.jar
-parquet-jackson-1.11.1.jar
-protobuf-java-2.5.0.jar
 protostuff-api-1.8.0.jar
 protostuff-collectionschema-1.8.0.jar
 protostuff-core-1.8.0.jar
@@ -51,6 +25,4 @@ scala-library-2.11.12.jar
 seatunnel-config-base-2.1.1.jar
 seatunnel-config-shade-2.1.1.jar
 seatunnel-jackson-2.3.1-SNAPSHOT-optional.jar
-slf4j-api-1.7.25.jar
-snappy-java-1.1.1.3.jar
-xz-1.5.jar
\ No newline at end of file
+slf4j-api-1.7.25.jar
\ No newline at end of file