You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/05 10:59:04 UTC

[incubator-seatunnel] branch api-draft updated: Add seatunnel datatype and convert origin value into seatunnel data type (#1797)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new c3407957 Add seatunnel datatype and convert origin value into seatunnel data type (#1797)
c3407957 is described below

commit c3407957f0e7e33affefe41d482b0f344a7ae58d
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu May 5 18:58:59 2022 +0800

    Add seatunnel datatype and convert origin value into seatunnel data type (#1797)
    
    * Add seatunnel datatype
---
 docs/en/concept/config.md                          | 108 +++++++++++++++++++
 docs/en/connector/config-example.md                |   8 --
 docs/en/faq.md                                     |   5 +-
 docs/en/start/local.mdx                            |   2 +-
 docs/en/transform/replace.md                       |  10 +-
 docs/en/transform/uuid.md                          |  62 +++++++++++
 docs/sidebars.js                                   |  10 +-
 .../seatunnel/api/table/catalog/Catalog.java       |   4 +
 .../seatunnel/api/table/catalog/CatalogTable.java  |  35 +++++--
 .../apache/seatunnel/api/table/catalog/Column.java |   6 ++
 .../seatunnel/api/table/catalog/TableSchema.java   |   3 +
 .../apache/seatunnel/api/table/type/BasicType.java |  23 +++++
 .../table/type/{DataType.java => BooleanType.java} |   7 +-
 .../apache/seatunnel/api/table/type/DataType.java  |  10 +-
 .../type/{DataType.java => DateTimeType.java}      |  16 ++-
 .../table/type/{DataType.java => DateType.java}    |  10 +-
 .../table/type/{DataType.java => DoubleType.java}  |   8 +-
 .../api/table/type/{DataType.java => IntType.java} |   8 +-
 .../table/type/{DataType.java => ListType.java}    |  12 ++-
 .../table/type/{DataType.java => LongType.java}    |   8 +-
 .../table/type/{DataType.java => NullType.java}    |   8 +-
 .../table/type/{DataType.java => StringType.java}  |   8 +-
 .../seatunnel/apis/{ => base/api}/BaseSink.java    |   6 +-
 .../seatunnel/apis/{ => base/api}/BaseSource.java  |   6 +-
 .../apis/{ => base/api}/BaseTransform.java         |   6 +-
 .../seatunnel/apis/base}/command/Command.java      |   4 +-
 .../seatunnel/apis/base/command/CommandArgs.java   |   7 +-
 .../apis/base}/command/CommandBuilder.java         |   6 +-
 .../seatunnel/{ => apis/base}/env/Execution.java   |  10 +-
 .../seatunnel/{ => apis/base}/env/RuntimeEnv.java  |   2 +-
 .../seatunnel/{ => apis/base}/plugin/Plugin.java   |   4 +-
 .../base}/plugin/PluginClosedException.java        |   2 +-
 .../org/apache/seatunnel/flink/BaseFlinkSink.java  |   2 +-
 .../apache/seatunnel/flink/BaseFlinkSource.java    |   2 +-
 .../apache/seatunnel/flink/BaseFlinkTransform.java |   2 +-
 .../apache/seatunnel/flink/FlinkEnvironment.java   |   2 +-
 .../seatunnel/flink/batch/FlinkBatchExecution.java |   2 +-
 .../flink/stream/FlinkStreamExecution.java         |   4 +-
 .../org/apache/seatunnel/spark/BaseSparkSink.java  |   2 +-
 .../apache/seatunnel/spark/BaseSparkSource.java    |   2 +-
 .../apache/seatunnel/spark/BaseSparkTransform.java |   2 +-
 .../apache/seatunnel/spark/SparkEnvironment.java   |   6 +-
 .../seatunnel/spark/batch/SparkBatchExecution.java |   2 +-
 .../StructuredStreamingExecution.java              |   2 +-
 .../spark/stream/SparkStreamingExecution.scala     |   4 +-
 .../apache/seatunnel/spark/iceberg/Config.scala    |  28 +++--
 .../seatunnel/spark/iceberg/sink/Iceberg.scala     |   9 +-
 .../seatunnel/spark/iceberg/source/Iceberg.scala   |  11 +-
 seatunnel-core/seatunnel-core-base/pom.xml         |   8 +-
 .../apache/seatunnel/command/CommandFactory.java   |  45 --------
 .../seatunnel/{ => core/base}/Seatunnel.java       |  23 ++---
 .../apache/seatunnel/{ => core/base}/Starter.java  |   2 +-
 .../base}/command/AbstractCommandArgs.java         |  13 +--
 .../base}/command/BaseTaskExecuteCommand.java      |  22 ++--
 .../seatunnel/{ => core/base}/command/Command.java |  10 +-
 .../{ => core/base}/command/CommandBuilder.java    |   6 +-
 .../base}/command/DeployModeConverter.java         |   2 +-
 .../{ => core/base}/config/ConfigBuilder.java      |   4 +-
 .../{ => core/base}/config/EngineType.java         |   2 +-
 .../{ => core/base}/config/EnvironmentFactory.java |   4 +-
 .../{ => core/base}/config/ExecutionContext.java   |  10 +-
 .../{ => core/base}/config/ExecutionFactory.java   |  12 +--
 .../{ => core/base}/config/PluginFactory.java      |   6 +-
 .../{ => core/base}/config/PluginType.java         |   2 +-
 .../{ => core/base}/utils/AsciiArtUtils.java       |   2 +-
 .../{ => core/base}/utils/CompressionUtils.java    |   2 +-
 .../seatunnel/{ => core/base}/utils/FileUtils.java |   4 +-
 .../seatunnel/command/CommandFactoryTest.java      |  64 ------------
 .../base}/command/BaseTaskExecuteCommandTest.java  |  11 +-
 .../base}/utils/CompressionUtilsTest.java          |   2 +-
 .../{ => core/base}/utils/FileUtilsTest.java       |  16 ++-
 seatunnel-core/seatunnel-core-flink-sql/pom.xml    |   2 +-
 .../src/main/bin/start-seatunnel-sql.sh            |  99 +++++-------------
 .../apache/seatunnel/core/sql/FlinkSqlStarter.java |  56 ++++++++++
 .../apache/seatunnel/core/sql/SeatunnelSql.java    |   7 +-
 .../core/sql/SqlVariableSubstitutionTest.java      |   7 +-
 .../src/main/bin/start-seatunnel-flink.sh          |   9 +-
 .../java/org/apache/seatunnel/FlinkStarter.java    | 115 ---------------------
 .../apache/seatunnel/core/flink/FlinkStarter.java  |  64 ++++++++++++
 .../seatunnel/{ => core/flink}/SeatunnelFlink.java |  18 ++--
 .../core/flink/args}/FlinkCommandArgs.java         |  22 +++-
 .../core/flink/command}/FlinkCommandBuilder.java   |  16 ++-
 .../flink/command}/FlinkConfValidateCommand.java   |  18 ++--
 .../flink/command}/FlinkTaskExecuteCommand.java    |  38 ++++---
 .../seatunnel/core/flink/config/FlinkJobType.java} |  12 ++-
 .../seatunnel/core/flink}/config/FlinkRunMode.java |   2 +-
 .../core/flink/constant/FlinkConstant.java         |   5 +-
 .../core/flink/utils/CommandLineUtils.java         |  87 ++++++++++++++++
 .../{ => core/flink}/FlinkStarterTest.java         |   2 +-
 .../core/flink/args}/FlinkCommandArgsTest.java     |   3 +-
 .../core/flink/utils/CommandLineUtilsTest.java     |  66 ++++++++++++
 seatunnel-core/seatunnel-core-spark/pom.xml        |   6 ++
 .../src/main/bin/start-seatunnel-spark.sh          |   9 +-
 .../seatunnel/{ => core/spark}/SeatunnelSpark.java |  15 ++-
 .../seatunnel/{ => core/spark}/SparkStarter.java   |  27 ++---
 .../core/spark/args}/SparkCommandArgs.java         |   6 +-
 .../core/spark/command}/SparkCommandBuilder.java   |  16 ++-
 .../spark/command}/SparkConfValidateCommand.java   |  19 ++--
 .../spark/command}/SparkTaskExecuteCommand.java    |  38 ++++---
 .../core/spark}/utils/CommandLineUtils.java        |  18 +---
 .../{ => core/spark}/SparkStarterTest.java         |   2 +-
 .../core/spark/args/SparkCommandArgsTest.java}     |   5 +-
 .../core/spark}/utils/CommandLineUtilsTest.java    |   5 +-
 .../apache/seatunnel/e2e/flink/FlinkContainer.java |   2 +-
 .../apache/seatunnel/e2e/spark/SparkContainer.java |   2 +-
 .../seatunnel/example/flink/LocalFlinkExample.java |  29 ++++--
 .../seatunnel/example/flink/LocalSqlExample.java   |  18 ++--
 .../seatunnel/example/spark/LocalSparkExample.java |  28 +++--
 .../flink/transform/DataStreamToTable.java         |   2 +-
 .../flink/transform/TableToDataStream.java         |   2 +-
 .../seatunnel-transforms-spark/pom.xml             |   1 +
 .../apache/seatunnel/spark/transform/Json.scala    |  22 ++--
 .../seatunnel/spark/transform/JsonConfig.scala     |  18 ++--
 .../apache/seatunnel/spark/transform/Replace.scala |  26 +++--
 .../seatunnel/spark/transform/ReplaceConfig.scala  |  21 ++--
 .../seatunnel/spark/transform/TestReplace.scala    |   1 +
 .../apache/seatunnel/spark/transform/Split.scala   |  24 +++--
 .../seatunnel/spark/transform/SplitConfig.scala    |  18 ++--
 .../org/apache/seatunnel/spark/transform/Sql.scala |   1 +
 .../seatunnel-transform-spark-uuid}/pom.xml        |  17 ++-
 .../org.apache.seatunnel.spark.BaseSparkTransform  |  25 +----
 .../apache/seatunnel/spark/transform/UUID.scala}   |  71 +++++++------
 .../seatunnel/spark/transform/UUIDConfig.scala     |  18 ++--
 .../seatunnel/spark/transform/TestUUID.scala}      |  31 +++---
 .../flink/serialization/FlinkRowSerialization.java |   1 +
 .../flink/types/FlinkTypeConverter.java            |  16 +++
 .../translation/flink/types/StringConverter.java   |  14 +++
 127 files changed, 1231 insertions(+), 769 deletions(-)

diff --git a/docs/en/concept/config.md b/docs/en/concept/config.md
new file mode 100644
index 00000000..533c3a5a
--- /dev/null
+++ b/docs/en/concept/config.md
@@ -0,0 +1,108 @@
+---
+sidebar_position: 2
+---
+
+# Intro to config file
+
+In SeaTunnel, the most important thing is the Config file, through which users can customize their own data
+synchronization requirements to maximize the potential of SeaTunnel. So next, I will introduce you how to
+configure the Config file.
+
+## Example
+
+Before you read on, you can find config file
+examples [here](https://github.com/apache/incubator-seatunnel/tree/dev/config) and in distribute package's
+config directory.
+
+## Config file structure
+
+The Config file will be similar to the one below.
+
+```hocon
+env {
+  execution.parallelism = 1
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    field_name = "name,age"
+  }
+}
+
+transform {
+  sql {
+    sql = "select name,age from fake"
+  }
+}
+
+sink {
+  Clickhouse {
+    host = "clickhouse:8123"
+    database = "default"
+    table = "seatunnel_console"
+    fields = ["name"]
+    username = "default"
+    password = ""
+  }
+}
+```
+
+As you can see, the Config file contains several sections: env, source, transform, sink. Different modules
+have different functions. After you understand these modules, you will understand how SeaTunnel works.
+
+### env
+
+Used to add some engine optional parameters, no matter which engine (Spark or Flink), the corresponding
+optional parameters should be filled in here.
+
+<!-- TODO add supported env parameters -->
+
+### source
+
+source is used to define where SeaTunnel needs to fetch data, and use the fetched data for the next step.
+Multiple sources can be defined at the same time. The supported source at now
+check [Source of SeaTunnel](../connector/source). Each source has its own specific parameters to define how to
+fetch data, and SeaTunnel also extracts the parameters that each source will use, such as
+the `result_table_name` parameter, which is used to specify the name of the data generated by the current
+source, which is convenient for follow-up used by other modules.
+
+### transform
+
+When we have the data source, we may need to further process the data, so we have the transform module. Of
+course, this uses the word 'may', which means that we can also directly treat the transform as non-existent,
+directly from source to sink. Like below.
+
+```hocon
+transform {
+  // no thing on here
+}
+```
+
+Like source, transform has specific parameters that belong to each module. The supported source at now check.
+The supported transform at now check [Transform of SeaTunnel](../transform)
+
+### sink
+
+Our purpose with SeaTunnel is to synchronize data from one place to another, so it is critical to define how
+and where data is written. With the sink module provided by SeaTunnel, you can complete this operation quickly
+and efficiently. Sink and source are very similar, but the difference is reading and writing. So go check out
+our [supported sinks](../connector/sink).
+
+### Other
+
+You will find that when multiple sources and multiple sinks are defined, which data is read by each sink, and
+which is the data read by each transform? We use `result_table_name` and `source_table_name` two key
+configurations. Each source module will be configured with a `result_table_name` to indicate the name of the
+data source generated by the data source, and other transform and sink modules can use `source_table_name` to
+refer to the corresponding data source name, indicating that I want to read the data for processing. Then
+transform, as an intermediate processing module, can use both `result_table_name` and `source_table_name`
+configurations at the same time. But you will find that in the above example Config, not every module is
+configured with these two parameters, because in SeaTunnel, there is a default convention, if these two
+parameters are not configured, then the generated data from the last module of the previous node will be used.
+This is much more convenient when there is only one source.
+
+## What's More
+
+If you want to know the details of this format configuration, Please
+see [HOCON](https://github.com/lightbend/config/blob/main/HOCON.md).
diff --git a/docs/en/connector/config-example.md b/docs/en/connector/config-example.md
deleted file mode 100644
index e5e21e7f..00000000
--- a/docs/en/connector/config-example.md
+++ /dev/null
@@ -1,8 +0,0 @@
-# Config Examples
-
-This section show you the example about SeaTunnel configuration file, we already have exists useful examples in
-[example-config](https://github.com/apache/incubator-seatunnel/tree/dev/config)
-
-## What's More
-
-If you want to know the details of this format configuration, Please see [HOCON](https://github.com/lightbend/config/blob/main/HOCON.md).
\ No newline at end of file
diff --git a/docs/en/faq.md b/docs/en/faq.md
index f509f64f..e72d3248 100644
--- a/docs/en/faq.md
+++ b/docs/en/faq.md
@@ -1,11 +1,8 @@
 # FAQ
 
-why-i-should-install-computing-engine-like-spark-or-flink
-
 ## Why I should install computing engine like Spark or Flink
 
-<!-- We should add the reason -->
-TODO
+Now SeaTunnel uses computing engines such as spark and flink to complete resource scheduling and node communication, so we can focus on the ease of use of data synchronization and the development of high-performance components. But this is only temporary.
 
 ## I have a question, but I can not solve it by myself
 
diff --git a/docs/en/start/local.mdx b/docs/en/start/local.mdx
index 0e57c4b0..71ee455f 100644
--- a/docs/en/start/local.mdx
+++ b/docs/en/start/local.mdx
@@ -11,7 +11,7 @@ import TabItem from '@theme/TabItem';
 
 Before you getting start the local run, you need to make sure you already have installed the following software which SeaTunnel required:
 
-* [Java](https://www.java.com/en/download/) (only JDK 8 supported by now) installed and `JAVA_HOME` set.
+* [Java](https://www.java.com/en/download/) (Java 8 or 11, other versions greater than Java 8 can theoretically work as well) installed and `JAVA_HOME` set.
 * Download the engine, you can choose and download one of them from below as your favour, you could see more information about [why we need engine in SeaTunnel](../faq.md#why-i-should-install-computing-engine-like-spark-or-flink)
   * Spark: Please [download Spark](https://spark.apache.org/downloads.html) first(**required version >= 2** and version < 3.x). For more information you could
   see [Getting Started: standalone](https://spark.apache.org/docs/latest/spark-standalone.html#installing-spark-standalone-to-a-cluster)
diff --git a/docs/en/transform/replace.md b/docs/en/transform/replace.md
index 8286007c..ecfce6dd 100644
--- a/docs/en/transform/replace.md
+++ b/docs/en/transform/replace.md
@@ -1,4 +1,4 @@
-# Json
+# Replace
 
 ## Description
 
@@ -33,13 +33,13 @@ The name of the field to replaced.
 
 The string to match.
 
-### is_regex [string]
+### replacement [string]
 
-Whether or not to interpret the pattern as a regex (true) or string literal (false).
+The replacement pattern (is_regex is true) or string literal (is_regex is false).
 
-### replacement [boolean]
+### is_regex [boolean]
 
-The replacement pattern (is_regex is true) or string literal (is_regex is false).
+Whether or not to interpret the pattern as a regex (true) or string literal (false).
 
 ### replace_first [boolean]
 
diff --git a/docs/en/transform/uuid.md b/docs/en/transform/uuid.md
new file mode 100644
index 00000000..4633a843
--- /dev/null
+++ b/docs/en/transform/uuid.md
@@ -0,0 +1,62 @@
+# UUID
+
+## Description
+
+Generate a universally unique identifier on a specified field.
+
+:::tip
+
+This transform **ONLY** supported by Spark.
+
+:::
+
+## Options
+
+| name           | type   | required | default value |
+| -------------- | ------ | -------- | ------------- |
+| fields         | string | yes      | -             |
+| prefix         | string | no       | -             |
+| secure         | boolean| no       | false         |
+
+### field [string]
+
+The name of the field to generate.
+
+### prefix [string]
+
+The prefix string constant to prepend to each generated UUID.
+
+### secure [boolean]
+
+the cryptographically secure algorithm can be comparatively slow
+The nonSecure algorithm uses a secure random seed but is otherwise deterministic
+
+### common options [string]
+
+Transform plugin common parameters, please refer to [Transform Plugin](common-options.mdx) for details
+
+## Examples
+
+```bash
+  UUID {
+    fields = "u"
+    prefix = "uuid-"
+    secure = true
+  }
+}
+```
+
+Use `UUID` as udf in sql.
+
+```bash
+  UUID {
+    fields = "u"
+    prefix = "uuid-"
+    secure = true
+  }
+
+  # Use the uuid function (confirm that the fake table exists)
+  sql {
+    sql = "select * from (select raw_message, UUID() as info_row from fake) t1"
+  }
+```
diff --git a/docs/sidebars.js b/docs/sidebars.js
index d96db4c6..1ce0f900 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -68,14 +68,20 @@ const sidebars = {
       items: [
         'start/local',
         'start/docker',
-        'start/kubernetes',
+        'start/kubernetes'
+      ],
+    },
+    {
+      type: 'category',
+      label: 'Concept',
+      items: [
+        'concept/config',
       ],
     },
     {
       type: 'category',
       label: 'Connector',
       items: [
-        'connector/config-example',
         {
           type: 'category',
           label: 'Source',
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index 96ce0ab1..78330460 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -17,5 +17,9 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
+/**
+ * Interface for reading and writing table metadata from SeaTunnel. Each connector need to contain
+ * the implementation of Catalog.
+ */
 public interface Catalog {
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
index e4ec2e7c..1c854230 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
@@ -21,26 +21,41 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Represent the table metadata in SeaTunnel.
+ */
 public final class CatalogTable implements Serializable {
+
     private static final long serialVersionUID = 1L;
+
+    /**
+     * Used to identify the table.
+     */
     private final TableIdentifier tableId;
+
+    /**
+     * The table schema metadata.
+     */
     private final TableSchema tableSchema;
+
     private final Map<String, String> options;
+
     private final List<String> partitionKeys;
+
     private final String comment;
 
     public static CatalogTable of(
-            TableIdentifier tableId,
-            TableSchema tableSchema,
-            Map<String, String> options,
-            List<String> partitionKeys,
-            String comment) {
+        TableIdentifier tableId,
+        TableSchema tableSchema,
+        Map<String, String> options,
+        List<String> partitionKeys,
+        String comment) {
         return new CatalogTable(
-                tableId,
-                tableSchema,
-                options,
-                partitionKeys,
-                comment);
+            tableId,
+            tableSchema,
+            options,
+            partitionKeys,
+            comment);
     }
 
     private CatalogTable(
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index 2a9ea7d5..01d2a287 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -26,8 +26,14 @@ import java.util.Optional;
 @SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
 public abstract class Column {
 
+    /**
+     * column name.
+     */
     protected final String name;
 
+    /**
+     * Data type of the column.
+     */
     protected final DataType dataType;
 
     protected final String comment;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index ac46fd8a..04ca5169 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -20,6 +20,9 @@ package org.apache.seatunnel.api.table.catalog;
 import java.io.Serializable;
 import java.util.List;
 
+/**
+ * Represent a physical table schema.
+ */
 public final class TableSchema implements Serializable {
     private static final long serialVersionUID = 1L;
     private final List<Column> columns;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
new file mode 100644
index 00000000..8135fb35
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
@@ -0,0 +1,23 @@
+package org.apache.seatunnel.api.table.type;
+
+public class BasicType<T> implements DataType<T> {
+
+    private final Class<T> typeClass;
+
+    public BasicType(Class<T> typeClass) {
+        if (typeClass == null) {
+            throw new IllegalArgumentException("typeClass cannot be null");
+        }
+        this.typeClass = typeClass;
+    }
+
+    @Override
+    public boolean isBasicType() {
+        return true;
+    }
+
+    @Override
+    public Class<T> getTypeClass() {
+        return this.typeClass;
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BooleanType.java
similarity index 79%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BooleanType.java
index 9d2d403a..5f2be918 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BooleanType.java
@@ -17,5 +17,10 @@
 
 package org.apache.seatunnel.api.table.type;
 
-public interface DataType {
+public class BooleanType extends BasicType<Boolean> {
+    private static final BooleanType INSTANCE = new BooleanType(Boolean.class);
+
+    private BooleanType(Class<Boolean> typeClass) {
+        super(typeClass);
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
index 9d2d403a..4cc51c3c 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
@@ -17,5 +17,13 @@
 
 package org.apache.seatunnel.api.table.type;
 
-public interface DataType {
+/**
+ * Data type of column in SeaTunnel.
+ */
+public interface DataType<T> {
+
+    boolean isBasicType();
+
+    Class<T> getTypeClass();
+
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java
similarity index 78%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java
index 9d2d403a..25d1bb95 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java
@@ -17,5 +17,19 @@
 
 package org.apache.seatunnel.api.table.type;
 
-public interface DataType {
+public class DateTimeType implements DataType {
+
+    public DateTimeType() {
+
+    }
+
+    @Override
+    public boolean isBasicType() {
+        return false;
+    }
+
+    @Override
+    public Class getTypeClass() {
+        return null;
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateType.java
similarity index 79%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateType.java
index 9d2d403a..8ab7c557 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateType.java
@@ -17,5 +17,13 @@
 
 package org.apache.seatunnel.api.table.type;
 
-public interface DataType {
+import java.util.Date;
+
+public class DateType extends BasicType<Date> {
+
+    private static final DateType INSTANCE = new DateType(Date.class);
+
+    private DateType(Class<Date> typeClass) {
+        super(typeClass);
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DoubleType.java
similarity index 80%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DoubleType.java
index 9d2d403a..b5ddaa87 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DoubleType.java
@@ -17,5 +17,11 @@
 
 package org.apache.seatunnel.api.table.type;
 
-public interface DataType {
+public class DoubleType extends BasicType<Double> {
+
+    public static final DoubleType INSTANCE = new DoubleType(Double.class);
+
+    private DoubleType(Class<Double> typeClass) {
+        super(typeClass);
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/IntType.java
similarity index 80%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/IntType.java
index 9d2d403a..fdcb8abf 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/IntType.java
@@ -17,5 +17,11 @@
 
 package org.apache.seatunnel.api.table.type;
 
-public interface DataType {
+public class IntType extends BasicType<Integer> {
+
+    private static final IntType INSTANCE = new IntType(Integer.class);
+
+    private IntType(Class<Integer> typeClass) {
+        super(typeClass);
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
similarity index 81%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
index 9d2d403a..9a7fef45 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
@@ -17,5 +17,15 @@
 
 package org.apache.seatunnel.api.table.type;
 
-public interface DataType {
+public class ListType implements DataType {
+
+    @Override
+    public boolean isBasicType() {
+        return false;
+    }
+
+    @Override
+    public Class getTypeClass() {
+        return null;
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/LongType.java
similarity index 81%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/LongType.java
index 9d2d403a..894a48b4 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/LongType.java
@@ -17,5 +17,11 @@
 
 package org.apache.seatunnel.api.table.type;
 
-public interface DataType {
+public class LongType extends BasicType<Long> {
+
+    private static final LongType INSTANCE = new LongType(Long.class);
+
+    private LongType(Class<Long> typeClass) {
+        super(typeClass);
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/NullType.java
similarity index 81%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/NullType.java
index 9d2d403a..781a906b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/NullType.java
@@ -17,5 +17,11 @@
 
 package org.apache.seatunnel.api.table.type;
 
-public interface DataType {
+public class NullType extends BasicType<Void> {
+
+    private static final NullType INSTANCE = new NullType(Void.class);
+
+    private NullType(Class<Void> typeClass) {
+        super(typeClass);
+    }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/StringType.java
similarity index 80%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/StringType.java
index 9d2d403a..c2247d9f 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/StringType.java
@@ -17,5 +17,11 @@
 
 package org.apache.seatunnel.api.table.type;
 
-public interface DataType {
+public class StringType extends BasicType<String> {
+
+    public static final StringType INSTANCE = new StringType(String.class);
+
+    private StringType(Class<String> typeClass) {
+        super(typeClass);
+    }
 }
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseSink.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSink.java
similarity index 86%
rename from seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseSink.java
rename to seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSink.java
index 3b3bb295..89f3283e 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseSink.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSink.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.apis;
+package org.apache.seatunnel.apis.base.api;
 
-import org.apache.seatunnel.env.RuntimeEnv;
-import org.apache.seatunnel.plugin.Plugin;
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.apis.base.plugin.Plugin;
 
 /**
  * a base interface indicates a sink plugin which will write data to other system.
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseSource.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSource.java
similarity index 86%
rename from seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseSource.java
rename to seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSource.java
index 767ebd8f..18e22e5a 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseSource.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseSource.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.apis;
+package org.apache.seatunnel.apis.base.api;
 
-import org.apache.seatunnel.env.RuntimeEnv;
-import org.apache.seatunnel.plugin.Plugin;
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.apis.base.plugin.Plugin;
 
 /**
  * a base interface indicates a source plugin which will read data from other system.
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseTransform.java
similarity index 86%
copy from seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java
copy to seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseTransform.java
index 5f78e00f..14d273f3 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/api/BaseTransform.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.apis;
+package org.apache.seatunnel.apis.base.api;
 
-import org.apache.seatunnel.env.RuntimeEnv;
-import org.apache.seatunnel.plugin.Plugin;
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.apis.base.plugin.Plugin;
 
 /**
  * a base interface indicates a transform plugin which will do transformations on data.
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/Command.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/Command.java
similarity index 91%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/Command.java
copy to seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/Command.java
index 1d99b926..0d4b2d16 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/Command.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/Command.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.apis.base.command;
 
 /**
  * Command interface.
@@ -23,7 +23,7 @@ package org.apache.seatunnel.command;
  * @param <T> args type
  */
 @FunctionalInterface
-public interface Command<T extends AbstractCommandArgs> {
+public interface Command<T extends CommandArgs> {
 
     /**
      * Execute command
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandArgs.java
similarity index 87%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
copy to seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandArgs.java
index 96ce0ab1..27118c43 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandArgs.java
@@ -15,7 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.catalog;
+package org.apache.seatunnel.apis.base.command;
 
-public interface Catalog {
+/**
+ * Used to create command.
+ */
+public interface CommandArgs {
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandBuilder.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandBuilder.java
similarity index 89%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandBuilder.java
copy to seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandBuilder.java
index 91f6e361..4eed944b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandBuilder.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/command/CommandBuilder.java
@@ -15,9 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.apis.base.command;
 
 @FunctionalInterface
-public interface CommandBuilder<T extends AbstractCommandArgs> {
+public interface CommandBuilder<T extends CommandArgs> {
+
     Command<T> buildCommand(T commandArgs);
+
 }
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/Execution.java
similarity index 84%
rename from seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
rename to seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/Execution.java
index 0e1a7caf..73040459 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/Execution.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.env;
+package org.apache.seatunnel.apis.base.env;
 
-import org.apache.seatunnel.apis.BaseSink;
-import org.apache.seatunnel.apis.BaseSource;
-import org.apache.seatunnel.apis.BaseTransform;
-import org.apache.seatunnel.plugin.Plugin;
+import org.apache.seatunnel.apis.base.api.BaseSink;
+import org.apache.seatunnel.apis.base.api.BaseSource;
+import org.apache.seatunnel.apis.base.api.BaseTransform;
+import org.apache.seatunnel.apis.base.plugin.Plugin;
 
 import java.util.List;
 
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/RuntimeEnv.java
similarity index 96%
rename from seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
rename to seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/RuntimeEnv.java
index be93251e..e8532a20 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/env/RuntimeEnv.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.env;
+package org.apache.seatunnel.apis.base.env;
 
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java
similarity index 95%
rename from seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
rename to seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java
index 6dd2ac44..63370c44 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/Plugin.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.plugin;
+package org.apache.seatunnel.apis.base.plugin;
 
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.env.RuntimeEnv;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/PluginClosedException.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/PluginClosedException.java
similarity index 96%
rename from seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/PluginClosedException.java
rename to seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/PluginClosedException.java
index 74b5a3f7..1733a0a4 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/PluginClosedException.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/base/plugin/PluginClosedException.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.plugin;
+package org.apache.seatunnel.apis.base.plugin;
 
 /**
  * an Exception used for the scenes when plugin closed error.
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java
index 0daa07bf..67b1c171 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.flink;
 
-import org.apache.seatunnel.apis.BaseSink;
+import org.apache.seatunnel.apis.base.api.BaseSink;
 
 /**
  * a base interface indicates a sink plugin running on Flink.
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java
index d0134962..20bc2bae 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkSource.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.flink;
 
-import org.apache.seatunnel.apis.BaseSource;
+import org.apache.seatunnel.apis.base.api.BaseSource;
 
 /**
  * a base interface indicates a source plugin running on Flink.
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkTransform.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkTransform.java
index fc3dba83..e467916f 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkTransform.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkTransform.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.flink;
 
-import org.apache.seatunnel.apis.BaseTransform;
+import org.apache.seatunnel.apis.base.api.BaseTransform;
 
 /**
  * a base interface indicates a transform plugin running on Flink.
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index bfd42a17..5df4a8be 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -17,10 +17,10 @@
 
 package org.apache.seatunnel.flink;
 
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
-import org.apache.seatunnel.env.RuntimeEnv;
 import org.apache.seatunnel.flink.util.ConfigKeyName;
 import org.apache.seatunnel.flink.util.EnvironmentUtil;
 
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index 844414f9..5ea03d31 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.flink.batch;
 
-import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.apis.base.env.Execution;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.util.TableUtil;
 
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index 04e55345..43fa7daa 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -17,10 +17,10 @@
 
 package org.apache.seatunnel.flink.stream;
 
-import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.apis.base.env.Execution;
+import org.apache.seatunnel.apis.base.plugin.Plugin;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.util.TableUtil;
-import org.apache.seatunnel.plugin.Plugin;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java
index e363cfb5..3c7dea7c 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.spark;
 
-import org.apache.seatunnel.apis.BaseSink;
+import org.apache.seatunnel.apis.base.api.BaseSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
index 47ac1e5f..0e1a24fb 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSource.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.spark;
 
-import org.apache.seatunnel.apis.BaseSource;
+import org.apache.seatunnel.apis.base.api.BaseSource;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java
index a3997d4a..b395ae09 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkTransform.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.spark;
 
-import org.apache.seatunnel.apis.BaseTransform;
+import org.apache.seatunnel.apis.base.api.BaseTransform;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
index f2422412..3e2d5e91 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -17,13 +17,13 @@
 
 package org.apache.seatunnel.spark;
 
-import static org.apache.seatunnel.plugin.Plugin.RESULT_TABLE_NAME;
-import static org.apache.seatunnel.plugin.Plugin.SOURCE_TABLE_NAME;
+import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
+import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
 
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.ConfigRuntimeException;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.env.RuntimeEnv;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
index b94e80a3..890324a6 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.spark.batch;
 
-import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.apis.base.env.Execution;
 import org.apache.seatunnel.spark.BaseSparkTransform;
 import org.apache.seatunnel.spark.SparkEnvironment;
 
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
index ced69034..d6a9570e 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.spark.structuredstream;
 
-import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.apis.base.env.Execution;
 import org.apache.seatunnel.spark.BaseSparkTransform;
 import org.apache.seatunnel.spark.SparkEnvironment;
 
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
index 69e5b2cd..3ad85d3c 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
@@ -16,8 +16,8 @@
  */
 package org.apache.seatunnel.spark.stream
 
-import org.apache.seatunnel.env.Execution
-import org.apache.seatunnel.plugin.Plugin
+import org.apache.seatunnel.apis.base.env.Execution
+import org.apache.seatunnel.apis.base.plugin.Plugin
 import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
 import org.apache.seatunnel.spark.{BaseSparkSink, BaseSparkSource, BaseSparkTransform, SparkEnvironment}
 import org.apache.spark.sql.{Dataset, Row}
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkTransform.java b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/Config.scala
similarity index 67%
copy from seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkTransform.java
copy to seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/Config.scala
index fc3dba83..e316a6c0 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/BaseFlinkTransform.java
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/Config.scala
@@ -14,18 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.seatunnel.flink;
-
-import org.apache.seatunnel.apis.BaseTransform;
+package org.apache.seatunnel.spark.iceberg
 
 /**
- * a base interface indicates a transform plugin running on Flink.
+ * Configurations and defaults for Iceberg source and sink
  */
-public interface BaseFlinkTransform extends BaseTransform<FlinkEnvironment> {
+object Config extends Serializable {
+
+  /**
+   * Save mode config
+   */
+  val SAVE_MODE = "saveMode"
 
-    default void registerFunction(FlinkEnvironment flinkEnvironment) {
+  /**
+   * Default save mode value
+   */
+  val SAVE_MODE_DEFAULT = "append"
 
-    }
+  /**
+   * Path config
+   */
+  val PATH = "path"
 
+  /**
+   * Pre sql config
+   */
+  val PRE_SQL = "pre_sql"
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala
index ee42b36e..2f15e236 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/sink/Iceberg.scala
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.shade.com.typesafe.config.{ConfigFactory, ConfigValueType}
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.seatunnel.spark.iceberg.Config.{PATH, SAVE_MODE, SAVE_MODE_DEFAULT}
 import org.apache.spark.sql.{Dataset, Row}
 
 import scala.collection.JavaConversions._
@@ -39,18 +40,18 @@ class Iceberg extends SparkBatchSink {
           writer.option(e.getKey, config.getString(e.getKey))
       }
     }
-    writer.mode(config.getString("saveMode"))
-      .save(config.getString("path"))
+    writer.mode(config.getString(SAVE_MODE))
+      .save(config.getString(PATH))
   }
 
   override def checkConfig(): CheckResult = {
-    checkAllExists(config, "path")
+    checkAllExists(config, PATH)
   }
 
   override def prepare(prepareEnv: SparkEnvironment): Unit = {
     val defaultConfig = ConfigFactory.parseMap(
       Map(
-        "saveMode" -> "append"))
+        SAVE_MODE -> SAVE_MODE_DEFAULT))
     config = config.withFallback(defaultConfig)
   }
 
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala
index 2bed3f11..ade58126 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-iceberg/src/main/scala/org/apache/seatunnel/spark/iceberg/source/Iceberg.scala
@@ -16,16 +16,19 @@
  */
 package org.apache.seatunnel.spark.iceberg.source
 
+import org.apache.seatunnel.apis.base.plugin.Plugin
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueType
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSource
+import org.apache.seatunnel.spark.iceberg.Config.{PATH, PRE_SQL}
 import org.apache.spark.sql.{Dataset, Row}
 
 import scala.collection.JavaConversions._
 
 class Iceberg extends SparkBatchSource {
+
   override def getData(env: SparkEnvironment): Dataset[Row] = {
     val reader = env.getSparkSession.read.format("iceberg")
     for (e <- config.entrySet()) {
@@ -38,13 +41,13 @@ class Iceberg extends SparkBatchSource {
           reader.option(e.getKey, config.getString(e.getKey))
       }
     }
-    val df = reader.load(config.getString("path"))
-    df.createOrReplaceTempView(config.getString("result_table_name"))
-    env.getSparkSession.sql(config.getString("pre_sql"))
+    val df = reader.load(config.getString(PATH))
+    df.createOrReplaceTempView(config.getString(Plugin.RESULT_TABLE_NAME))
+    env.getSparkSession.sql(config.getString(PRE_SQL))
   }
 
   override def checkConfig(): CheckResult = {
-    checkAllExists(config, "path", "pre_sql")
+    checkAllExists(config, PATH, PRE_SQL)
   }
 
   override def getPluginName: String = "Iceberg"
diff --git a/seatunnel-core/seatunnel-core-base/pom.xml b/seatunnel-core/seatunnel-core-base/pom.xml
index 11e6f2e7..1248fb91 100644
--- a/seatunnel-core/seatunnel-core-base/pom.xml
+++ b/seatunnel-core/seatunnel-core-base/pom.xml
@@ -34,7 +34,7 @@
 
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-api-spark</artifactId>
+            <artifactId>seatunnel-api-base</artifactId>
             <version>${project.version}</version>
         </dependency>
 
@@ -44,6 +44,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api-spark</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>com.beust</groupId>
             <artifactId>jcommander</artifactId>
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandFactory.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandFactory.java
deleted file mode 100644
index d465fa76..00000000
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.command;
-
-import org.apache.seatunnel.command.flink.FlinkCommandBuilder;
-import org.apache.seatunnel.command.spark.SparkCommandBuilder;
-
-public class CommandFactory {
-
-    private CommandFactory() {
-    }
-
-    /**
-     * Create seatunnel command.
-     *
-     * @param commandArgs command args.
-     * @return Special command.s
-     */
-    @SuppressWarnings("unchecked")
-    public static <T extends AbstractCommandArgs> Command<T> createCommand(T commandArgs) {
-        switch (commandArgs.getEngineType()) {
-            case FLINK:
-                return (Command<T>) new FlinkCommandBuilder().buildCommand((FlinkCommandArgs) commandArgs);
-            case SPARK:
-                return (Command<T>) new SparkCommandBuilder().buildCommand((SparkCommandArgs) commandArgs);
-            default:
-                throw new RuntimeException(String.format("engine type: %s is not supported", commandArgs.getEngineType()));
-        }
-    }
-}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
similarity index 77%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
index 2116cf4a..644df716 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
@@ -15,13 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel;
+package org.apache.seatunnel.core.base;
 
-import org.apache.seatunnel.command.AbstractCommandArgs;
-import org.apache.seatunnel.command.Command;
-import org.apache.seatunnel.command.CommandFactory;
-import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.apis.base.command.CommandArgs;
 import org.apache.seatunnel.common.config.ConfigRuntimeException;
+import org.apache.seatunnel.core.base.command.Command;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -33,19 +31,12 @@ public class Seatunnel {
     /**
      * This method is the entrypoint of SeaTunnel.
      *
-     * @param commandArgs commandArgs
-     * @param <T> commandType
+     * @param command commandArgs
+     * @param <T>         commandType
      */
-    public static <T extends AbstractCommandArgs> void run(T commandArgs) {
-
-        if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
-            throw new IllegalArgumentException(
-                String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
-        }
-
+    public static <T extends CommandArgs> void run(Command<T> command) {
         try {
-            Command<T> command = CommandFactory.createCommand(commandArgs);
-            command.execute(commandArgs);
+            command.execute();
         } catch (ConfigRuntimeException e) {
             showConfigError(e);
             throw e;
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Starter.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Starter.java
similarity index 96%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Starter.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Starter.java
index d109ae3c..e5d2c2fe 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Starter.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Starter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel;
+package org.apache.seatunnel.core.base;
 
 import java.util.List;
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
similarity index 85%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/AbstractCommandArgs.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
index 9f3e5120..7f46fb03 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
@@ -15,25 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.core.base.command;
 
+import org.apache.seatunnel.apis.base.command.CommandArgs;
 import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.config.EngineType;
+import org.apache.seatunnel.core.base.config.EngineType;
 
 import com.beust.jcommander.Parameter;
 
 import java.util.Collections;
 import java.util.List;
 
-public abstract class AbstractCommandArgs {
+public abstract class AbstractCommandArgs implements CommandArgs {
 
     @Parameter(names = {"-c", "--config"},
-            description = "Config file",
-            required = true)
+        description = "Config file",
+        required = true)
     private String configFile;
 
     @Parameter(names = {"-i", "--variable"},
-            description = "variable substitution, such as -i city=beijing, or -i date=20190318")
+        description = "variable substitution, such as -i city=beijing, or -i date=20190318")
     private List<String> variables = Collections.emptyList();
 
     @Parameter(names = {"-t", "--check"},
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
similarity index 87%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
index 8dd95902..bab835e2 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
@@ -15,17 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.core.base.command;
 
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.apis.base.plugin.Plugin;
+import org.apache.seatunnel.apis.base.plugin.PluginClosedException;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.env.RuntimeEnv;
-import org.apache.seatunnel.plugin.Plugin;
-import org.apache.seatunnel.plugin.PluginClosedException;
-import org.apache.seatunnel.utils.AsciiArtUtils;
-import org.apache.seatunnel.utils.CompressionUtils;
+import org.apache.seatunnel.core.base.utils.AsciiArtUtils;
+import org.apache.seatunnel.core.base.utils.CompressionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,11 +36,7 @@ import java.util.Objects;
 import java.util.Optional;
 
 /**
- * Base task execute command. More details see:
- * <ul>
- *     <li>{@link org.apache.seatunnel.command.flink.FlinkTaskExecuteCommand}</li>
- *     <li>{@link org.apache.seatunnel.command.spark.SparkTaskExecuteCommand}</li>
- * </ul>
+ * Base task execute command.
  *
  * @param <T> command args.
  */
@@ -60,7 +56,7 @@ public abstract class BaseTaskExecuteCommand<T extends AbstractCommandArgs, E ex
     }
 
     /**
-     * Execute prepare method defined in {@link org.apache.seatunnel.plugin.Plugin}.
+     * Execute prepare method defined in {@link Plugin}.
      *
      * @param env     runtimeEnv
      * @param plugins plugin list
@@ -73,7 +69,7 @@ public abstract class BaseTaskExecuteCommand<T extends AbstractCommandArgs, E ex
     }
 
     /**
-     * Execute close method defined in {@link org.apache.seatunnel.plugin.Plugin}
+     * Execute close method defined in {@link Plugin}
      *
      * @param plugins plugin list
      */
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/Command.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
similarity index 83%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/Command.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
index 1d99b926..5cd80fc5 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/Command.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
@@ -15,7 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.core.base.command;
+
+import org.apache.seatunnel.apis.base.command.CommandArgs;
 
 /**
  * Command interface.
@@ -23,13 +25,11 @@ package org.apache.seatunnel.command;
  * @param <T> args type
  */
 @FunctionalInterface
-public interface Command<T extends AbstractCommandArgs> {
+public interface Command<T extends CommandArgs> {
 
     /**
      * Execute command
-     *
-     * @param commandArgs args
      */
-    void execute(T commandArgs);
+    void execute();
 
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandBuilder.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/CommandBuilder.java
similarity index 84%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandBuilder.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/CommandBuilder.java
index 91f6e361..09e4e83f 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandBuilder.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/CommandBuilder.java
@@ -15,9 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.core.base.command;
+
+import org.apache.seatunnel.apis.base.command.CommandArgs;
 
 @FunctionalInterface
-public interface CommandBuilder<T extends AbstractCommandArgs> {
+public interface CommandBuilder<T extends CommandArgs> {
     Command<T> buildCommand(T commandArgs);
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeConverter.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/DeployModeConverter.java
similarity index 96%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeConverter.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/DeployModeConverter.java
index ff7cc76f..8c9f629e 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeConverter.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/DeployModeConverter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.core.base.command;
 
 import org.apache.seatunnel.common.config.DeployMode;
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
similarity index 96%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
index 4e9d05b9..605613b0 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config;
+package org.apache.seatunnel.core.base.config;
 
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.config.ConfigRuntimeException;
-import org.apache.seatunnel.env.RuntimeEnv;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/EngineType.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EngineType.java
similarity index 95%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/EngineType.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EngineType.java
index 5b8d2397..bb8f4ddd 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/EngineType.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EngineType.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config;
+package org.apache.seatunnel.core.base.config;
 
 public enum EngineType {
     SPARK("spark"),
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/EnvironmentFactory.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
similarity index 97%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/EnvironmentFactory.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
index 3ddf6d63..ff450836 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/EnvironmentFactory.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config;
+package org.apache.seatunnel.core.base.config;
 
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.env.RuntimeEnv;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.spark.SparkEnvironment;
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ExecutionContext.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionContext.java
similarity index 90%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ExecutionContext.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionContext.java
index 7a716a71..7e91a368 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ExecutionContext.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionContext.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config;
+package org.apache.seatunnel.core.base.config;
 
-import org.apache.seatunnel.apis.BaseSink;
-import org.apache.seatunnel.apis.BaseSource;
-import org.apache.seatunnel.apis.BaseTransform;
+import org.apache.seatunnel.apis.base.api.BaseSink;
+import org.apache.seatunnel.apis.base.api.BaseSource;
+import org.apache.seatunnel.apis.base.api.BaseTransform;
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.env.RuntimeEnv;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ExecutionFactory.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
similarity index 91%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ExecutionFactory.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
index 8dcd6124..62b807d1 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ExecutionFactory.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config;
+package org.apache.seatunnel.core.base.config;
 
-import org.apache.seatunnel.apis.BaseSink;
-import org.apache.seatunnel.apis.BaseSource;
-import org.apache.seatunnel.apis.BaseTransform;
-import org.apache.seatunnel.env.Execution;
-import org.apache.seatunnel.env.RuntimeEnv;
+import org.apache.seatunnel.apis.base.api.BaseSink;
+import org.apache.seatunnel.apis.base.api.BaseSource;
+import org.apache.seatunnel.apis.base.api.BaseTransform;
+import org.apache.seatunnel.apis.base.env.Execution;
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchExecution;
 import org.apache.seatunnel.flink.stream.FlinkStreamExecution;
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginFactory.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
similarity index 98%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginFactory.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
index a25babbd..82b51ef1 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginFactory.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config;
+package org.apache.seatunnel.core.base.config;
 
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.apis.base.plugin.Plugin;
 import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.env.RuntimeEnv;
 import org.apache.seatunnel.flink.BaseFlinkSink;
 import org.apache.seatunnel.flink.BaseFlinkSource;
 import org.apache.seatunnel.flink.BaseFlinkTransform;
-import org.apache.seatunnel.plugin.Plugin;
 import org.apache.seatunnel.spark.BaseSparkSink;
 import org.apache.seatunnel.spark.BaseSparkSource;
 import org.apache.seatunnel.spark.BaseSparkTransform;
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginType.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
similarity index 95%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginType.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
index e8c0705b..f5f30815 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginType.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/PluginType.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config;
+package org.apache.seatunnel.core.base.config;
 
 public enum PluginType {
     SOURCE("source"), TRANSFORM("transform"), SINK("sink");
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/AsciiArtUtils.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/AsciiArtUtils.java
similarity index 98%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/AsciiArtUtils.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/AsciiArtUtils.java
index 780cd7eb..326d5d37 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/AsciiArtUtils.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/AsciiArtUtils.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.core.base.utils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CompressionUtils.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java
similarity index 99%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CompressionUtils.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java
index 4007a971..f3c2d45e 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CompressionUtils.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.core.base.utils;
 
 import org.apache.commons.compress.archivers.ArchiveException;
 import org.apache.commons.compress.archivers.ArchiveStreamFactory;
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/FileUtils.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/FileUtils.java
similarity index 95%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/FileUtils.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/FileUtils.java
index cd2d4975..edc15b17 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/FileUtils.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/FileUtils.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.core.base.utils;
 
-import org.apache.seatunnel.command.AbstractCommandArgs;
+import org.apache.seatunnel.core.base.command.AbstractCommandArgs;
 
 import java.io.File;
 import java.nio.file.Path;
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/CommandFactoryTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/CommandFactoryTest.java
deleted file mode 100644
index a5bed877..00000000
--- a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/CommandFactoryTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.command;
-
-import org.apache.seatunnel.command.flink.FlinkConfValidateCommand;
-import org.apache.seatunnel.command.flink.FlinkTaskExecuteCommand;
-import org.apache.seatunnel.command.spark.SparkConfValidateCommand;
-import org.apache.seatunnel.command.spark.SparkTaskExecuteCommand;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class CommandFactoryTest {
-
-    @Test
-    public void testCreateSparkConfValidateCommand() {
-        SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
-        sparkCommandArgs.setCheckConfig(true);
-        Command<SparkCommandArgs> sparkCommand = CommandFactory.createCommand(sparkCommandArgs);
-        Assert.assertEquals(SparkConfValidateCommand.class, sparkCommand.getClass());
-    }
-
-    @Test
-    public void testCreateSparkExecuteTaskCommand() {
-        SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
-        sparkCommandArgs.setCheckConfig(false);
-        Command<SparkCommandArgs> sparkCommand = CommandFactory.createCommand(sparkCommandArgs);
-        Assert.assertEquals(SparkTaskExecuteCommand.class, sparkCommand.getClass());
-    }
-
-    @Test
-    public void testCreateFlinkConfValidateCommand() {
-        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
-        flinkCommandArgs.setCheckConfig(true);
-
-        Command<FlinkCommandArgs> flinkCommand = CommandFactory.createCommand(flinkCommandArgs);
-        Assert.assertEquals(FlinkConfValidateCommand.class, flinkCommand.getClass());
-    }
-
-    @Test
-    public void testCreateFlinkExecuteTaskCommand() {
-        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
-        flinkCommandArgs.setCheckConfig(false);
-
-        Command<FlinkCommandArgs> flinkCommand = CommandFactory.createCommand(flinkCommandArgs);
-        Assert.assertEquals(FlinkTaskExecuteCommand.class, flinkCommand.getClass());
-
-    }
-}
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommandTest.java
similarity index 93%
rename from seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
rename to seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommandTest.java
index bec6836c..e1275fab 100644
--- a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommandTest.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.core.base.command;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 
+import org.apache.seatunnel.apis.base.plugin.Plugin;
+import org.apache.seatunnel.apis.base.plugin.PluginClosedException;
 import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.plugin.Plugin;
-import org.apache.seatunnel.plugin.PluginClosedException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -110,12 +110,11 @@ public class BaseTaskExecuteCommandTest {
 
     }
 
-    private static class MockTaskExecutorCommand extends BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
+    private static class MockTaskExecutorCommand extends BaseTaskExecuteCommand<AbstractCommandArgs, FlinkEnvironment> {
 
         @Override
-        public void execute(FlinkCommandArgs commandArgs) {
+        public void execute() {
 
         }
-
     }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/CompressionUtilsTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/core/base/utils/CompressionUtilsTest.java
similarity index 97%
rename from seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/CompressionUtilsTest.java
rename to seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/core/base/utils/CompressionUtilsTest.java
index fc665ddc..9c23fd77 100644
--- a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/CompressionUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/core/base/utils/CompressionUtilsTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.core.base.utils;
 
 import static org.junit.Assert.assertTrue;
 
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/FileUtilsTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/core/base/utils/FileUtilsTest.java
similarity index 79%
rename from seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/FileUtilsTest.java
rename to seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/core/base/utils/FileUtilsTest.java
index 59e71677..16fbd39e 100644
--- a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/FileUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/core/base/utils/FileUtilsTest.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.core.base.utils;
 
-import org.apache.seatunnel.command.SparkCommandArgs;
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.base.command.AbstractCommandArgs;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -42,4 +42,16 @@ public class FileUtilsTest {
         sparkCommandArgs.setDeployMode(DeployMode.CLUSTER);
         Assert.assertEquals("flink.batch.conf", FileUtils.getConfigPath(sparkCommandArgs).toString());
     }
+
+    private static class SparkCommandArgs extends AbstractCommandArgs {
+        private DeployMode deployMode;
+
+        public void setDeployMode(DeployMode deployMode) {
+            this.deployMode = deployMode;
+        }
+
+        public DeployMode getDeployMode() {
+            return deployMode;
+        }
+    }
 }
diff --git a/seatunnel-core/seatunnel-core-flink-sql/pom.xml b/seatunnel-core/seatunnel-core-flink-sql/pom.xml
index 0824ddae..6078e0ba 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/pom.xml
+++ b/seatunnel-core/seatunnel-core-flink-sql/pom.xml
@@ -33,7 +33,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-core-base</artifactId>
+            <artifactId>seatunnel-core-flink</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh b/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
index 368db2af..bc0e13fb 100755
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
@@ -16,84 +16,31 @@
 # limitations under the License.
 #
 
-# copy command line arguments
-
-function usage() {
-  echo "Usage: start-seatunnel-sql.sh [options]"
-  echo "  options:"
-  echo "    --config, -c FILE_PATH        Config file"
-  echo "    --variable, -i PROP=VALUE     Variable substitution, such as -i city=beijing, or -i date=20190318"
-  echo "    --check, -t                   Check config"
-  echo "    --help, -h                    Show this help message"
-}
+set -eu
+APP_DIR=$(cd $(dirname ${0})/../;pwd)
+CONF_DIR=${APP_DIR}/config
+APP_JAR=${APP_DIR}/lib/seatunnel-core-flink-sql.jar
 
-if [[ "$@" = *--help ]] || [[ "$@" = *-h ]] || [[ $# -le 1 ]]; then
-  usage
-  exit 0
+if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
+    . "${CONF_DIR}/seatunnel-env.sh"
 fi
 
-is_exist() {
-    if [ -z $1 ]; then
-      usage
-      exit -1
-    fi
-}
-
-PARAMS=""
-while (( "$#" )); do
-  case "$1" in
-    -c|--config)
-      CONFIG_FILE=$2
-      is_exist ${CONFIG_FILE}
-      shift 2
-      ;;
-
-    -i|--variable)
-      variable=$2
-      is_exist ${variable}
-      java_property_value="-D${variable}"
-      variables_substitution="${java_property_value} ${variables_substitution}"
-      shift 2
-      ;;
-
-    *) # preserve positional arguments
-      PARAMS="$PARAMS $1"
-      shift
-      ;;
-
-  esac
-done
-
-if [ -z ${CONFIG_FILE} ]; then
-  echo "Error: The following option is required: [-c | --config]"
-  usage
-  exit -1
-elif [ ! -f ${CONFIG_FILE} ];then
-  echo "Error: Config file ${CONFIG_FILE} does not exists! Please check it."
-  exit -1
+if [ $# == 0 ]
+then
+    args="-h"
+else
+    args=$@
 fi
 
-# set positional arguments in their proper place
-eval set -- "$PARAMS"
-
-BIN_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
-APP_DIR=$(dirname ${BIN_DIR})
-CONF_DIR=${APP_DIR}/config
-PLUGINS_DIR=${APP_DIR}/lib
-DEFAULT_CONFIG=${CONF_DIR}/application.conf
-CONFIG_FILE=${CONFIG_FILE:-$DEFAULT_CONFIG}
-
-assemblyJarName=$(find ${PLUGINS_DIR} -name seatunnel-core-flink-sql*.jar)
-
-source ${CONF_DIR}/seatunnel-env.sh
-
-string_trim() {
-    echo $1 | awk '{$1=$1;print}'
-}
-
-export JVM_ARGS=$(string_trim "${variables_substitution}")
-
-exec ${FLINK_HOME}/bin/flink run \
-    ${PARAMS} \
-    -c org.apache.seatunnel.core.sql.SeatunnelSql \
-    ${assemblyJarName} --config ${CONFIG_FILE}
+CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.core.sql.FlinkSqlStarter ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
+if [ ${EXIT_CODE} -eq 234 ]; then
+    # print usage
+    echo ${CMD}
+    exit 0
+elif [ ${EXIT_CODE} -eq 0 ]; then
+    echo "Execute SeaTunnel Flink SQL Job: ${CMD}"
+    eval ${CMD}
+else
+    echo ${CMD}
+    exit ${EXIT_CODE}
+fi
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
new file mode 100644
index 00000000..1e5f52a4
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.sql;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.core.base.Starter;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
+
+import java.util.List;
+
+public class FlinkSqlStarter implements Starter {
+
+    private static final String APP_JAR_NAME = "seatunnel-core-flink-sql.jar";
+    private static final String CLASS_NAME = SeatunnelSql.class.getName();
+
+    private final FlinkCommandArgs flinkCommandArgs;
+    /**
+     * SeaTunnel flink sql job jar.
+     */
+    private final String appJar;
+
+    FlinkSqlStarter(String[] args) {
+        this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+        // set the deployment mode, used to get the job jar path.
+        Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
+        this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
+    }
+
+    @Override
+    public List<String> buildCommands() {
+        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, appJar);
+    }
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public static void main(String[] args) {
+        FlinkSqlStarter flinkSqlStarter = new FlinkSqlStarter(args);
+        System.out.println(String.join(" ", flinkSqlStarter.buildCommands()));
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
index ea8475b4..47a83268 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
@@ -17,10 +17,11 @@
 
 package org.apache.seatunnel.core.sql;
 
-import org.apache.seatunnel.command.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 import org.apache.seatunnel.core.sql.job.Executor;
 import org.apache.seatunnel.core.sql.job.JobInfo;
-import org.apache.seatunnel.utils.CommandLineUtils;
 
 import org.apache.commons.io.FileUtils;
 
@@ -36,7 +37,7 @@ public class SeatunnelSql {
     }
 
     private static JobInfo parseJob(String[] args) throws IOException {
-        FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
+        FlinkCommandArgs flinkArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
         String configFilePath = flinkArgs.getConfigFile();
         String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
         JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
index 95a0354e..03891ab4 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
@@ -17,9 +17,10 @@
 
 package org.apache.seatunnel.core.sql;
 
-import org.apache.seatunnel.command.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 import org.apache.seatunnel.core.sql.job.JobInfo;
-import org.apache.seatunnel.utils.CommandLineUtils;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
@@ -36,7 +37,7 @@ public class SqlVariableSubstitutionTest {
         String[] args = {"-c", System.getProperty("user.dir") + TEST_RESOURCE_DIR + "flink.sql.conf.template",
             "-t", "-i", "table_name=events", "-i", "table_name2=print_table"};
 
-        FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
+        FlinkCommandArgs flinkArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
         String configFilePath = flinkArgs.getConfigFile();
         String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
         JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh b/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
index 95160a6b..05fe96b3 100755
--- a/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
+++ b/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
@@ -25,7 +25,14 @@ if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
     . "${CONF_DIR}/seatunnel-env.sh"
 fi
 
-CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.FlinkStarter ${@}) && EXIT_CODE=$? || EXIT_CODE=$?
+if [ $# == 0 ]
+then
+    args="-h"
+else
+    args=$@
+fi
+
+CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.core.flink.FlinkStarter ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
 if [ ${EXIT_CODE} -eq 234 ]; then
     # print usage
     echo ${CMD}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
deleted file mode 100644
index 16006fbb..00000000
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel;
-
-import org.apache.seatunnel.command.FlinkCommandArgs;
-import org.apache.seatunnel.common.config.Common;
-
-import com.beust.jcommander.JCommander;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * The SeaTunnel flink starter. This class is responsible for generate the final flink job execute command.
- */
-public class FlinkStarter implements Starter {
-
-    private static final String APP_NAME = SeatunnelFlink.class.getName();
-    private static final int USAGE_EXIT_CODE = 234;
-    private static final String APP_JAR_NAME = "seatunnel-core-flink.jar";
-    private static final String RUN_MODE_RUN = "run";
-    private static final String RUN_MODE_APPLICATION = "run-application";
-
-    /**
-     * Flink parameters, used by flink job itself. e.g. `-m yarn-cluster`
-     */
-    private final List<String> flinkParams = new ArrayList<>();
-
-    /**
-     * SeaTunnel parameters, used by SeaTunnel application. e.g. `-c config.conf`
-     */
-    private final FlinkCommandArgs flinkCommandArgs;
-
-    /**
-     * SeaTunnel flink job jar.
-     */
-    private final String appJar;
-
-    FlinkStarter(String[] args) {
-        this.flinkCommandArgs = parseArgs(args);
-        // set the deployment mode, used to get the job jar path.
-        Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
-        this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
-    }
-
-    @SuppressWarnings("checkstyle:RegexpSingleline")
-    public static void main(String[] args) {
-        FlinkStarter flinkStarter = new FlinkStarter(args);
-        List<String> command = flinkStarter.buildCommands();
-        String finalFLinkCommand = String.join(" ", command);
-        System.out.println(finalFLinkCommand);
-    }
-
-    /**
-     * Parse seatunnel args.
-     *
-     * @param args args
-     * @return FlinkCommandArgs
-     */
-    private FlinkCommandArgs parseArgs(String[] args) {
-        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
-        JCommander jCommander = JCommander.newBuilder()
-            .programName("start-seatunnel-flink.sh")
-            .addObject(flinkCommandArgs)
-            .acceptUnknownOptions(true)
-            .args(args)
-            .build();
-        // The args is not belongs to seatunnel, add into flink params
-        flinkParams.addAll(jCommander.getUnknownOptions());
-        if (flinkCommandArgs.isHelp()) {
-            jCommander.usage();
-            System.exit(USAGE_EXIT_CODE);
-        }
-        return flinkCommandArgs;
-    }
-
-    @Override
-    public List<String> buildCommands() {
-        List<String> command = new ArrayList<>();
-        command.add("${FLINK_HOME}/bin/flink");
-        command.add(flinkCommandArgs.getRunMode().getMode());
-        command.addAll(flinkParams);
-        command.add("-c");
-        command.add(APP_NAME);
-        command.add(appJar);
-        command.add("--config");
-        command.add(flinkCommandArgs.getConfigFile());
-        if (flinkCommandArgs.isCheckConfig()) {
-            command.add("--check");
-        }
-        // set System properties
-        flinkCommandArgs.getVariables().stream()
-            .filter(Objects::nonNull)
-            .map(String::trim)
-            .forEach(variable -> command.add("-D" + variable));
-        return command;
-    }
-
-}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
new file mode 100644
index 00000000..6fe18a49
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.core.base.Starter;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
+
+import java.util.List;
+
+/**
+ * The SeaTunnel flink starter. This class is responsible for generate the final flink job execute command.
+ */
+public class FlinkStarter implements Starter {
+
+    private static final String APP_NAME = SeatunnelFlink.class.getName();
+    private static final String APP_JAR_NAME = "seatunnel-core-flink.jar";
+
+    /**
+     * SeaTunnel parameters, used by SeaTunnel application. e.g. `-c config.conf`
+     */
+    private final FlinkCommandArgs flinkCommandArgs;
+
+    /**
+     * SeaTunnel flink job jar.
+     */
+    private final String appJar;
+
+    FlinkStarter(String[] args) {
+        this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        // set the deployment mode, used to get the job jar path.
+        Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
+        this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
+    }
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public static void main(String[] args) {
+        FlinkStarter flinkStarter = new FlinkStarter(args);
+        System.out.println(String.join(" ", flinkStarter.buildCommands()));
+    }
+
+    @Override
+    public List<String> buildCommands() {
+        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar);
+    }
+
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
similarity index 54%
rename from seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java
rename to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
index c240a432..45792d00 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
@@ -15,16 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel;
+package org.apache.seatunnel.core.flink;
 
-import org.apache.seatunnel.command.FlinkCommandArgs;
-import org.apache.seatunnel.utils.CommandLineUtils;
+import org.apache.seatunnel.core.base.Seatunnel;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 
 public class SeatunnelFlink {
 
-    public static void main(String[] args) throws Exception {
-        FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
-        Seatunnel.run(flinkArgs);
+    public static void main(String[] args) {
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
+            .buildCommand(flinkCommandArgs);
+        Seatunnel.run(flinkCommand);
     }
 
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
similarity index 80%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
rename to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
index 16d3b34a..fb892f9c 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
@@ -15,15 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.core.flink.args;
 
 import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.config.EngineType;
-import org.apache.seatunnel.config.FlinkRunMode;
+import org.apache.seatunnel.core.base.command.AbstractCommandArgs;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.flink.config.FlinkRunMode;
 
 import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
 
+import java.util.List;
+
 public class FlinkCommandArgs extends AbstractCommandArgs {
 
     @Parameter(names = {"-r", "--run-mode"},
@@ -31,6 +34,11 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         description = "job run mode, run or run-application")
     private FlinkRunMode runMode = FlinkRunMode.RUN;
 
+    /**
+     * Undefined parameters parsed will be stored here as flink command parameters.
+     */
+    private List<String> flinkParams;
+
     @Override
     public EngineType getEngineType() {
         return EngineType.FLINK;
@@ -49,6 +57,14 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         this.runMode = runMode;
     }
 
+    public List<String> getFlinkParams() {
+        return flinkParams;
+    }
+
+    public void setFlinkParams(List<String> flinkParams) {
+        this.flinkParams = flinkParams;
+    }
+
     /**
      * Used to convert the run mode string to the enum value.
      */
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkCommandBuilder.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
similarity index 64%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkCommandBuilder.java
rename to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
index 5eedfdc2..595f01a5 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
@@ -15,16 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command.flink;
+package org.apache.seatunnel.core.flink.command;
 
-import org.apache.seatunnel.command.Command;
-import org.apache.seatunnel.command.CommandBuilder;
-import org.apache.seatunnel.command.FlinkCommandArgs;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.command.CommandBuilder;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 
 public class FlinkCommandBuilder implements CommandBuilder<FlinkCommandArgs> {
 
     @Override
     public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
-        return commandArgs.isCheckConfig() ? new FlinkConfValidateCommand() : new FlinkTaskExecuteCommand();
+        if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
+            throw new IllegalArgumentException(
+                String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
+        }
+        return commandArgs.isCheckConfig() ? new FlinkConfValidateCommand(commandArgs)
+            : new FlinkTaskExecuteCommand(commandArgs);
     }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkConfValidateCommand.java
similarity index 74%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
rename to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkConfValidateCommand.java
index 85ac71b2..5868e252 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkConfValidateCommand.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command.flink;
+package org.apache.seatunnel.core.flink.command;
 
-import org.apache.seatunnel.command.Command;
-import org.apache.seatunnel.command.FlinkCommandArgs;
-import org.apache.seatunnel.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.utils.FileUtils;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.utils.FileUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,8 +35,14 @@ public class FlinkConfValidateCommand implements Command<FlinkCommandArgs> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(FlinkConfValidateCommand.class);
 
+    private final FlinkCommandArgs flinkCommandArgs;
+
+    public FlinkConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
+        this.flinkCommandArgs = flinkCommandArgs;
+    }
+
     @Override
-    public void execute(FlinkCommandArgs flinkCommandArgs) {
+    public void execute() {
         Path configPath = FileUtils.getConfigPath(flinkCommandArgs);
         new ConfigBuilder<FlinkEnvironment>(configPath, flinkCommandArgs.getEngineType()).checkConfig();
         LOGGER.info("config OK !");
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
similarity index 66%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
index d6863467..32d13440 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
@@ -15,20 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command.flink;
+package org.apache.seatunnel.core.flink.command;
 
-import org.apache.seatunnel.apis.BaseSink;
-import org.apache.seatunnel.apis.BaseSource;
-import org.apache.seatunnel.apis.BaseTransform;
-import org.apache.seatunnel.command.BaseTaskExecuteCommand;
-import org.apache.seatunnel.command.FlinkCommandArgs;
-import org.apache.seatunnel.config.ConfigBuilder;
-import org.apache.seatunnel.config.EngineType;
-import org.apache.seatunnel.config.ExecutionContext;
-import org.apache.seatunnel.config.ExecutionFactory;
-import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.apis.base.api.BaseSink;
+import org.apache.seatunnel.apis.base.api.BaseSource;
+import org.apache.seatunnel.apis.base.api.BaseTransform;
+import org.apache.seatunnel.apis.base.env.Execution;
+import org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.ExecutionContext;
+import org.apache.seatunnel.core.base.config.ExecutionFactory;
+import org.apache.seatunnel.core.base.utils.FileUtils;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.utils.FileUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -40,8 +40,14 @@ import java.util.List;
  */
 public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
 
+    private final FlinkCommandArgs flinkCommandArgs;
+
+    public FlinkTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
+        this.flinkCommandArgs = flinkCommandArgs;
+    }
+
     @Override
-    public void execute(FlinkCommandArgs flinkCommandArgs) {
+    public void execute() {
         EngineType engine = flinkCommandArgs.getEngineType();
         Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
 
@@ -55,9 +61,9 @@ public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommand
         showAsciiLogo();
 
         try (Execution<BaseSource<FlinkEnvironment>,
-                BaseTransform<FlinkEnvironment>,
-                BaseSink<FlinkEnvironment>,
-                FlinkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {
+            BaseTransform<FlinkEnvironment>,
+            BaseSink<FlinkEnvironment>,
+            FlinkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {
             prepare(executionContext.getEnvironment(), sources, transforms, sinks);
             execution.start(sources, transforms, sinks);
             close(sources, transforms, sinks);
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginType.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkJobType.java
similarity index 81%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginType.java
rename to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkJobType.java
index e8c0705b..6abe5f75 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginType.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkJobType.java
@@ -15,18 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config;
+package org.apache.seatunnel.core.flink.config;
 
-public enum PluginType {
-    SOURCE("source"), TRANSFORM("transform"), SINK("sink");
+public enum FlinkJobType {
+    JAR("start-seatunnel-flink.sh"),
+    SQL("start-seatunnel-sql.sh"),
+    ;
 
     private final String type;
 
-    PluginType(String type) {
+    FlinkJobType(String type) {
         this.type = type;
     }
 
     public String getType() {
-        return type;
+        return this.type;
     }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/FlinkRunMode.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkRunMode.java
similarity index 96%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/FlinkRunMode.java
rename to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkRunMode.java
index 41e46b45..1e8f2246 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/FlinkRunMode.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkRunMode.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config;
+package org.apache.seatunnel.core.flink.config;
 
 /**
  * Flink run mode, used to determine whether to run in local or cluster mode.
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java
similarity index 86%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
copy to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java
index 96ce0ab1..b57c6293 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java
@@ -15,7 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.catalog;
+package org.apache.seatunnel.core.flink.constant;
 
-public interface Catalog {
+public class FlinkConstant {
+    public static final int USAGE_EXIT_CODE = 234;
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
new file mode 100644
index 00000000..cdbf610a
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.utils;
+
+import static org.apache.seatunnel.core.flink.constant.FlinkConstant.USAGE_EXIT_CODE;
+
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.UnixStyleUsageFormatter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class CommandLineUtils {
+
+    private CommandLineUtils() {
+        throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
+    }
+
+    public static FlinkCommandArgs parseFlinkArgs(String[] args) {
+        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+        JCommander.newBuilder()
+            .addObject(flinkCommandArgs)
+            .build()
+            .parse(args);
+        return flinkCommandArgs;
+    }
+
+    public static FlinkCommandArgs parseCommandArgs(String[] args, FlinkJobType jobType) {
+        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+        JCommander jCommander = JCommander.newBuilder()
+            .programName(jobType.getType())
+            .addObject(flinkCommandArgs)
+            .acceptUnknownOptions(true)
+            .args(args)
+            .build();
+        // The args is not belongs to seatunnel, add into flink params
+        flinkCommandArgs.setFlinkParams(jCommander.getUnknownOptions());
+        if (flinkCommandArgs.isHelp()) {
+            jCommander.setUsageFormatter(new UnixStyleUsageFormatter(jCommander));
+            jCommander.usage();
+            System.exit(USAGE_EXIT_CODE);
+        }
+        return flinkCommandArgs;
+
+    }
+
+    public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath) {
+        List<String> command = new ArrayList<>();
+        command.add("${FLINK_HOME}/bin/flink");
+        command.add(flinkCommandArgs.getRunMode().getMode());
+        command.addAll(flinkCommandArgs.getFlinkParams());
+        command.add("-c");
+        command.add(className);
+        command.add(jarPath);
+        command.add("--config");
+        command.add(flinkCommandArgs.getConfigFile());
+        if (flinkCommandArgs.isCheckConfig()) {
+            command.add("--check");
+        }
+        // set System properties
+        flinkCommandArgs.getVariables().stream()
+          .filter(Objects::nonNull)
+          .map(String::trim)
+          .forEach(variable -> command.add("-D" + variable));
+        return command;
+
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
similarity index 98%
rename from seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java
rename to seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
index 4ba5322f..02d23b3c 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel;
+package org.apache.seatunnel.core.flink;
 
 import org.junit.Assert;
 import org.junit.Test;
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/FlinkCommandArgsTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgsTest.java
similarity index 96%
rename from seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/FlinkCommandArgsTest.java
rename to seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgsTest.java
index 56e990df..56f4f5e6 100644
--- a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/FlinkCommandArgsTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgsTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.core.flink.args;
 
 import com.beust.jcommander.JCommander;
 import org.junit.Assert;
@@ -37,4 +37,5 @@ public class FlinkCommandArgsTest {
         Assert.assertTrue(flinkArgs.isCheckConfig());
         Assert.assertEquals(Arrays.asList("city=shenyang", "date=20200202"), flinkArgs.getVariables());
     }
+
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
new file mode 100644
index 00000000..66f1ec69
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.utils;
+
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.config.FlinkRunMode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class CommandLineUtilsTest {
+
+    @Test
+    public void testParseCommandArgs() {
+        String[] args = {"--detached", "-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202",
+            "-r", "run-application", "--unkown", "unkown-command"};
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        Assert.assertEquals(flinkCommandArgs.getFlinkParams(), Arrays.asList("--detached", "--unkown", "unkown-command"));
+        Assert.assertEquals(flinkCommandArgs.getRunMode(), FlinkRunMode.APPLICATION_RUN);
+        Assert.assertEquals(flinkCommandArgs.getVariables(), Arrays.asList("city=shenyang", "date=20200202"));
+
+        String[] args1 = {"--detached", "-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202",
+            "-r", "run-application", "--unkown", "unkown-command"};
+        flinkCommandArgs = CommandLineUtils.parseCommandArgs(args1, FlinkJobType.SQL);
+        Assert.assertEquals(flinkCommandArgs.getFlinkParams(), Arrays.asList("--detached", "--unkown", "unkown-command"));
+        Assert.assertEquals(flinkCommandArgs.getRunMode(), FlinkRunMode.APPLICATION_RUN);
+        Assert.assertEquals(flinkCommandArgs.getVariables(), Arrays.asList("city=shenyang", "date=20200202"));
+    }
+
+    @Test
+    public void testBuildFlinkCommand() {
+        String[] args = {"--detached", "-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202",
+            "-r", "run-application", "--unkown", "unkown-command"};
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
+        Assert.assertEquals(commands,
+            Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
+                "CLASS_NAME", "/path/to/jar", "--config", "app.conf", "--check", "-Dcity=shenyang", "-Ddate=20200202"));
+
+        flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+        commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
+        Assert.assertEquals(commands,
+            Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
+                "CLASS_NAME", "/path/to/jar", "--config", "app.conf", "--check", "-Dcity=shenyang", "-Ddate=20200202"));
+
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml
index 53ae179d..86410e07 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -74,6 +74,12 @@
             <artifactId>seatunnel-transform-spark-replace</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transform-spark-uuid</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh b/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh
index 4528a68b..a03c79dd 100755
--- a/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh
+++ b/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh
@@ -24,7 +24,14 @@ if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
     . "${CONF_DIR}/seatunnel-env.sh"
 fi
 
-CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.SparkStarter ${@} | tail -n 1) && EXIT_CODE=$? || EXIT_CODE=$?
+if [ $# == 0 ]
+then
+    args="-h"
+else
+    args=$@
+fi
+
+CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.core.spark.SparkStarter ${args} | tail -n 1) && EXIT_CODE=$? || EXIT_CODE=$?
 if [ ${EXIT_CODE} -eq 234 ]; then
     # print usage
     echo ${CMD}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
similarity index 63%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java
rename to seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
index 7d6eb829..d88cc7ec 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
@@ -15,15 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel;
+package org.apache.seatunnel.core.spark;
 
-import org.apache.seatunnel.command.SparkCommandArgs;
-import org.apache.seatunnel.utils.CommandLineUtils;
+import org.apache.seatunnel.core.base.Seatunnel;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
+import org.apache.seatunnel.core.spark.utils.CommandLineUtils;
 
 public class SeatunnelSpark {
 
-    public static void main(String[] args) throws Exception {
+    public static void main(String[] args) {
         SparkCommandArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
-        Seatunnel.run(sparkArgs);
+        Command<SparkCommandArgs> sparkCommand =
+            new SparkCommandBuilder().buildCommand(sparkArgs);
+        Seatunnel.run(sparkCommand);
     }
 }
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SparkStarter.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
similarity index 94%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SparkStarter.java
rename to seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index f89ce4ba..81f44ba9 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SparkStarter.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -15,25 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel;
+package org.apache.seatunnel.core.spark;
 
 import static java.nio.file.FileVisitOption.FOLLOW_LINKS;
 
-import org.apache.seatunnel.command.SparkCommandArgs;
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.config.ConfigBuilder;
-import org.apache.seatunnel.config.EngineType;
-import org.apache.seatunnel.config.PluginFactory;
-import org.apache.seatunnel.env.RuntimeEnv;
-import org.apache.seatunnel.utils.CompressionUtils;
+import org.apache.seatunnel.core.base.Starter;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.PluginFactory;
+import org.apache.seatunnel.core.base.utils.CompressionUtils;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
 
 import com.beust.jcommander.JCommander;
+import com.beust.jcommander.UnixStyleUsageFormatter;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.File;
@@ -132,6 +134,7 @@ public class SparkStarter implements Starter {
                 .args(args)
                 .build();
         if (commandArgs.isHelp()) {
+            commander.setUsageFormatter(new UnixStyleUsageFormatter(commander));
             commander.usage();
             System.exit(USAGE_EXIT_CODE);
         }
@@ -160,8 +163,8 @@ public class SparkStarter implements Starter {
                 .filter(pair -> pair.length == 2)
                 .forEach(pair -> System.setProperty(pair[0], pair[1]));
         this.sparkConf = getSparkConf(commandArgs.getConfigFile());
-        String driverJavaOpts = this.sparkConf.get("spark.driver.extraJavaOptions");
-        String executorJavaOpts = this.sparkConf.get("spark.executor.extraJavaOptions");
+        String driverJavaOpts = this.sparkConf.getOrDefault("spark.driver.extraJavaOptions", "");
+        String executorJavaOpts = this.sparkConf.getOrDefault("spark.executor.extraJavaOptions", "");
         if (!commandArgs.getVariables().isEmpty()) {
             String properties = commandArgs.getVariables()
                     .stream()
@@ -169,8 +172,8 @@ public class SparkStarter implements Starter {
                     .collect(Collectors.joining(" "));
             driverJavaOpts += " " + properties;
             executorJavaOpts += " " + properties;
-            this.sparkConf.put("spark.driver.extraJavaOptions", driverJavaOpts);
-            this.sparkConf.put("spark.executor.extraJavaOptions", executorJavaOpts);
+            this.sparkConf.put("spark.driver.extraJavaOptions", driverJavaOpts.trim());
+            this.sparkConf.put("spark.executor.extraJavaOptions", executorJavaOpts.trim());
         }
     }
 
@@ -240,7 +243,7 @@ public class SparkStarter implements Starter {
     protected List<String> buildFinal() {
         List<String> commands = new ArrayList<>();
         commands.add("${SPARK_HOME}/bin/spark-submit");
-        appendOption(commands, "--class", "org.apache.seatunnel.SeatunnelSpark");
+        appendOption(commands, "--class", SeatunnelSpark.class.getName());
         appendOption(commands, "--name", this.appName);
         appendOption(commands, "--master", this.commandArgs.getMaster());
         appendOption(commands, "--deploy-mode", this.commandArgs.getDeployMode().getName());
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/SparkCommandArgs.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/args/SparkCommandArgs.java
similarity index 87%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/SparkCommandArgs.java
rename to seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/args/SparkCommandArgs.java
index e03faa00..9eb36685 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/SparkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/args/SparkCommandArgs.java
@@ -15,10 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.core.spark.args;
 
 import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.config.EngineType;
+import org.apache.seatunnel.core.base.command.AbstractCommandArgs;
+import org.apache.seatunnel.core.base.command.DeployModeConverter;
+import org.apache.seatunnel.core.base.config.EngineType;
 
 import com.beust.jcommander.Parameter;
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkCommandBuilder.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
similarity index 64%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkCommandBuilder.java
rename to seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
index ee3972ed..4ea8ed1e 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
@@ -15,17 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command.spark;
+package org.apache.seatunnel.core.spark.command;
 
-import org.apache.seatunnel.command.Command;
-import org.apache.seatunnel.command.CommandBuilder;
-import org.apache.seatunnel.command.SparkCommandArgs;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.command.CommandBuilder;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 
 public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
 
     @Override
     public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
-        return commandArgs.isCheckConfig() ? new SparkConfValidateCommand() : new SparkTaskExecuteCommand();
+        if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
+            throw new IllegalArgumentException(
+                String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
+        }
+        return commandArgs.isCheckConfig() ? new SparkConfValidateCommand(commandArgs)
+            : new SparkTaskExecuteCommand(commandArgs);
     }
 
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
similarity index 74%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
rename to seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
index 73aafe7b..5629e51d 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command.spark;
+package org.apache.seatunnel.core.spark.command;
 
-import org.apache.seatunnel.command.Command;
-import org.apache.seatunnel.command.SparkCommandArgs;
-import org.apache.seatunnel.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.utils.FileUtils;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.spark.SparkEnvironment;
-import org.apache.seatunnel.utils.FileUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,11 +35,16 @@ public class SparkConfValidateCommand implements Command<SparkCommandArgs> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(SparkConfValidateCommand.class);
 
+    private final SparkCommandArgs sparkCommandArgs;
+
+    public SparkConfValidateCommand(SparkCommandArgs sparkCommandArgs) {
+        this.sparkCommandArgs = sparkCommandArgs;
+    }
+
     @Override
-    public void execute(SparkCommandArgs sparkCommandArgs) {
+    public void execute() {
         Path confPath = FileUtils.getConfigPath(sparkCommandArgs);
         new ConfigBuilder<SparkEnvironment>(confPath, sparkCommandArgs.getEngineType()).checkConfig();
         LOGGER.info("config OK !");
     }
-
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
similarity index 65%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
index 3c58da84..c8aebd4b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
@@ -15,20 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command.spark;
+package org.apache.seatunnel.core.spark.command;
 
-import org.apache.seatunnel.apis.BaseSink;
-import org.apache.seatunnel.apis.BaseSource;
-import org.apache.seatunnel.apis.BaseTransform;
-import org.apache.seatunnel.command.BaseTaskExecuteCommand;
-import org.apache.seatunnel.command.SparkCommandArgs;
-import org.apache.seatunnel.config.ConfigBuilder;
-import org.apache.seatunnel.config.EngineType;
-import org.apache.seatunnel.config.ExecutionContext;
-import org.apache.seatunnel.config.ExecutionFactory;
-import org.apache.seatunnel.env.Execution;
+import org.apache.seatunnel.apis.base.api.BaseSink;
+import org.apache.seatunnel.apis.base.api.BaseSource;
+import org.apache.seatunnel.apis.base.api.BaseTransform;
+import org.apache.seatunnel.apis.base.env.Execution;
+import org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.ExecutionContext;
+import org.apache.seatunnel.core.base.config.ExecutionFactory;
+import org.apache.seatunnel.core.base.utils.FileUtils;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.spark.SparkEnvironment;
-import org.apache.seatunnel.utils.FileUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -37,8 +37,14 @@ import java.util.List;
 
 public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommandArgs, SparkEnvironment> {
 
+    private final SparkCommandArgs sparkCommandArgs;
+
+    public SparkTaskExecuteCommand(SparkCommandArgs sparkCommandArgs) {
+        this.sparkCommandArgs = sparkCommandArgs;
+    }
+
     @Override
-    public void execute(SparkCommandArgs sparkCommandArgs) {
+    public void execute() {
         EngineType engine = sparkCommandArgs.getEngineType();
         Path confFile = FileUtils.getConfigPath(sparkCommandArgs);
 
@@ -53,9 +59,9 @@ public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommand
         showAsciiLogo();
 
         try (Execution<
-                BaseSource<SparkEnvironment>,
-                BaseTransform<SparkEnvironment>,
-                BaseSink<SparkEnvironment>, SparkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {
+            BaseSource<SparkEnvironment>,
+            BaseTransform<SparkEnvironment>,
+            BaseSink<SparkEnvironment>, SparkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {
             prepare(executionContext.getEnvironment(), sources, transforms, sinks);
             execution.start(sources, transforms, sinks);
             close(sources, transforms, sinks);
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/utils/CommandLineUtils.java
similarity index 71%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CommandLineUtils.java
rename to seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/utils/CommandLineUtils.java
index 540d4ce6..811bddde 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/utils/CommandLineUtils.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.core.spark.utils;
 
-import org.apache.seatunnel.command.FlinkCommandArgs;
-import org.apache.seatunnel.command.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 
 import com.beust.jcommander.JCommander;
 
-public final class CommandLineUtils {
+public class CommandLineUtils {
 
     private CommandLineUtils() {
+        throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
     }
 
     public static SparkCommandArgs parseSparkArgs(String[] args) {
@@ -35,14 +35,4 @@ public final class CommandLineUtils {
             .parse(args);
         return sparkCommandArgs;
     }
-
-    public static FlinkCommandArgs parseFlinkArgs(String[] args) {
-        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
-        JCommander.newBuilder()
-            .addObject(flinkCommandArgs)
-            .build()
-            .parse(args);
-        return flinkCommandArgs;
-    }
-
 }
diff --git a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/SparkStarterTest.java b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java
similarity index 97%
rename from seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/SparkStarterTest.java
rename to seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java
index 1aeb5473..20d35b92 100644
--- a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/SparkStarterTest.java
+++ b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel;
+package org.apache.seatunnel.core.spark;
 
 import static org.junit.Assert.assertEquals;
 
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/CommandSparkArgsTest.java b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/args/SparkCommandArgsTest.java
similarity index 96%
rename from seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/CommandSparkArgsTest.java
rename to seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/args/SparkCommandArgsTest.java
index 791a1f63..bb6e8f17 100644
--- a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/CommandSparkArgsTest.java
+++ b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/args/SparkCommandArgsTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.command;
+package org.apache.seatunnel.core.spark.args;
 
 import org.apache.seatunnel.common.config.DeployMode;
 
@@ -25,7 +25,7 @@ import org.junit.Test;
 
 import java.util.Arrays;
 
-public class CommandSparkArgsTest {
+public class SparkCommandArgsTest {
 
     @Test
     public void testParseSparkArgs() {
@@ -63,4 +63,5 @@ public class CommandSparkArgsTest {
             .build()
             .parse(args);
     }
+
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/utils/CommandLineUtilsTest.java
similarity index 93%
rename from seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/CommandLineUtilsTest.java
rename to seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/utils/CommandLineUtilsTest.java
index f24fd3a3..a0694b8d 100644
--- a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/CommandLineUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/utils/CommandLineUtilsTest.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.core.spark.utils;
 
-import org.apache.seatunnel.command.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 
 import com.beust.jcommander.ParameterException;
 import org.junit.Assert;
@@ -39,5 +39,4 @@ public class CommandLineUtilsTest {
         String[] args = {"-c", "app.conf", "-e", "cluster2xxx", "-m", "local[*]"};
         Assert.assertThrows(ParameterException.class, () -> CommandLineUtils.parseSparkArgs(args));
     }
-
 }
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
index 52019de6..ffa1c991 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -118,7 +118,7 @@ public abstract class FlinkContainer {
         final List<String> command = new ArrayList<>();
         command.add("flink");
         command.add("run");
-        command.add("-c org.apache.seatunnel.SeatunnelFlink " + jar);
+        command.add("-c org.apache.seatunnel.core.flink.SeatunnelFlink " + jar);
         command.add("--config " + conf);
 
         Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
index dcd20b3b..b3dd5d16 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
@@ -98,7 +98,7 @@ public abstract class SparkContainer {
         final List<String> command = new ArrayList<>();
         command.add("spark-submit");
         command.add("--class");
-        command.add("org.apache.seatunnel.SeatunnelSpark");
+        command.add("org.apache.seatunnel.core.spark.SeatunnelSpark");
         command.add("--name");
         command.add("SeaTunnel");
         command.add("--master");
diff --git a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
index e42c7e01..a740b8ee 100644
--- a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
+++ b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
@@ -17,23 +17,34 @@
 
 package org.apache.seatunnel.example.flink;
 
-import org.apache.seatunnel.Seatunnel;
-import org.apache.seatunnel.command.FlinkCommandArgs;
+import org.apache.seatunnel.core.base.Seatunnel;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
 
-public class LocalFlinkExample {
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
 
-    public static final String TEST_RESOURCE_DIR = "/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/";
+public class LocalFlinkExample {
 
-    public static void main(String[] args) {
-        String configFile = getTestConfigFile("fake_to_console.conf");
+    public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
+        String configFile = getTestConfigFile("/examples/fake_to_console.conf");
         FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
         flinkCommandArgs.setConfigFile(configFile);
         flinkCommandArgs.setCheckConfig(false);
         flinkCommandArgs.setVariables(null);
-        Seatunnel.run(flinkCommandArgs);
+        Command<FlinkCommandArgs> flinkCommand =
+            new FlinkCommandBuilder().buildCommand(flinkCommandArgs);
+        Seatunnel.run(flinkCommand);
     }
 
-    public static String getTestConfigFile(String configFile) {
-        return System.getProperty("user.dir") + TEST_RESOURCE_DIR + configFile;
+    public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException {
+        URL resource = LocalFlinkExample.class.getResource(configFile);
+        if (resource == null) {
+            throw new FileNotFoundException("Can't find config file: " + configFile);
+        }
+        return Paths.get(resource.toURI()).toString();
     }
 }
diff --git a/seatunnel-examples/seatunnel-flink-sql-examples/src/main/java/org/apache/seatunnel/example/flink/LocalSqlExample.java b/seatunnel-examples/seatunnel-flink-sql-examples/src/main/java/org/apache/seatunnel/example/flink/LocalSqlExample.java
index 60a570b8..4d26efa9 100644
--- a/seatunnel-examples/seatunnel-flink-sql-examples/src/main/java/org/apache/seatunnel/example/flink/LocalSqlExample.java
+++ b/seatunnel-examples/seatunnel-flink-sql-examples/src/main/java/org/apache/seatunnel/example/flink/LocalSqlExample.java
@@ -23,20 +23,26 @@ import org.apache.seatunnel.core.sql.job.JobInfo;
 import org.apache.commons.io.FileUtils;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
 
 public class LocalSqlExample {
 
-    public static final String TEST_RESOURCE_DIR = "/seatunnel-examples/seatunnel-flink-sql-examples/src/main/resources/examples/";
-
-    public static void main(String[] args) throws IOException {
-        String configFile = getTestConfigFile("flink.sql.conf.template");
+    public static void main(String[] args) throws IOException, URISyntaxException {
+        String configFile = getTestConfigFile("/examples/flink.sql.conf.template");
         String jobContent = FileUtils.readFileToString(new File(configFile), StandardCharsets.UTF_8);
         Executor.runJob(new JobInfo(jobContent));
     }
 
-    public static String getTestConfigFile(String configFile) {
-        return System.getProperty("user.dir") + TEST_RESOURCE_DIR + configFile;
+    public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException {
+        URL resource = LocalSqlExample.class.getResource(configFile);
+        if (resource == null) {
+            throw new FileNotFoundException("Could not find config: " + configFile);
+        }
+        return Paths.get(resource.toURI()).toString();
     }
 }
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
index 57ad918e..5469ec73 100644
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
@@ -17,25 +17,35 @@
 
 package org.apache.seatunnel.example.spark;
 
-import org.apache.seatunnel.Seatunnel;
-import org.apache.seatunnel.command.SparkCommandArgs;
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.base.Seatunnel;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
 
-public class LocalSparkExample {
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
 
-    public static final String TEST_RESOURCE_DIR = "/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/";
+public class LocalSparkExample {
 
-    public static void main(String[] args) {
-        String configFile = getTestConfigFile("spark.batch.conf");
+    public static void main(String[] args) throws URISyntaxException, FileNotFoundException {
+        String configFile = getTestConfigFile("/examples/spark.batch.conf");
         SparkCommandArgs sparkArgs = new SparkCommandArgs();
         sparkArgs.setConfigFile(configFile);
         sparkArgs.setCheckConfig(false);
         sparkArgs.setVariables(null);
         sparkArgs.setDeployMode(DeployMode.CLIENT);
-        Seatunnel.run(sparkArgs);
+        Command<SparkCommandArgs> sparkCommand = new SparkCommandBuilder().buildCommand(sparkArgs);
+        Seatunnel.run(sparkCommand);
     }
 
-    public static String getTestConfigFile(String configFile) {
-        return System.getProperty("user.dir") + TEST_RESOURCE_DIR + configFile;
+    public static String getTestConfigFile(String configFile) throws URISyntaxException, FileNotFoundException {
+        URL resource = LocalSparkExample.class.getResource(configFile);
+        if (resource == null) {
+            throw new FileNotFoundException("Could not find config file: " + configFile);
+        }
+        return Paths.get(resource.toURI()).toString();
     }
 }
diff --git a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-datastream2table/src/main/java/org/apache/seatunnel/flink/transform/DataStreamToTable.java b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-datastream2table/src/main/java/org/apache/seatunnel/flink/transform/DataStreamToTable.java
index 834c7c5c..705d21ab 100644
--- a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-datastream2table/src/main/java/org/apache/seatunnel/flink/transform/DataStreamToTable.java
+++ b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-datastream2table/src/main/java/org/apache/seatunnel/flink/transform/DataStreamToTable.java
@@ -17,12 +17,12 @@
 
 package org.apache.seatunnel.flink.transform;
 
+import org.apache.seatunnel.apis.base.plugin.Plugin;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchTransform;
 import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
-import org.apache.seatunnel.plugin.Plugin;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
diff --git a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-table2datastream/src/main/java/org/apache/seatunnel/flink/transform/TableToDataStream.java b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-table2datastream/src/main/java/org/apache/seatunnel/flink/transform/TableToDataStream.java
index a504a388..822b47e3 100644
--- a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-table2datastream/src/main/java/org/apache/seatunnel/flink/transform/TableToDataStream.java
+++ b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-table2datastream/src/main/java/org/apache/seatunnel/flink/transform/TableToDataStream.java
@@ -17,13 +17,13 @@
 
 package org.apache.seatunnel.flink.transform;
 
+import org.apache.seatunnel.apis.base.plugin.Plugin;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.batch.FlinkBatchTransform;
 import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
 import org.apache.seatunnel.flink.util.TableUtil;
-import org.apache.seatunnel.plugin.Plugin;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
index 0626dfc0..e95a4ebc 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
+++ b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
@@ -34,6 +34,7 @@
         <module>seatunnel-transform-spark-json</module>
         <module>seatunnel-transform-spark-split</module>
         <module>seatunnel-transform-spark-replace</module>
+        <module>seatunnel-transform-spark-uuid</module>
         <module>seatunnel-transform-spark-sql</module>
     </modules>
 
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
index 519be92b..63e50cee 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
@@ -14,11 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
 import org.apache.seatunnel.common.config.{Common, ConfigRuntimeException}
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
+import org.apache.seatunnel.spark.transform.JsonConfig._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
@@ -40,19 +42,19 @@ class Json extends BaseSparkTransform {
   var useCustomSchema: Boolean = false
 
   override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
-    val srcField = config.getString("source_field")
+    val srcField = config.getString(SOURCE_FILED)
     val spark = env.getSparkSession
 
     import spark.implicits._
 
-    config.getString("target_field") match {
+    config.getString(TARGET_FILED) match {
       case Constants.ROW_ROOT => {
 
         val jsonRDD = df.select(srcField).as[String].rdd
 
         val newDF = srcField match {
           // for backward-compatibility for spark < 2.2.0, we created rdd, not Dataset[String]
-          case "raw_message" => {
+          case DEFAULT_SOURCE_FILED => {
             val tmpDF =
               if (this.useCustomSchema) {
                 spark.read.schema(this.customSchema).json(jsonRDD)
@@ -94,14 +96,14 @@ class Json extends BaseSparkTransform {
   override def prepare(env: SparkEnvironment): Unit = {
     val defaultConfig = ConfigFactory.parseMap(
       Map(
-        "source_field" -> "raw_message",
-        "target_field" -> Constants.ROW_ROOT,
-        "schema_dir" -> Paths.get(Common.pluginFilesDir("json").toString, "schemas").toString,
-        "schema_file" -> ""))
+        SOURCE_FILED -> DEFAULT_SOURCE_FILED,
+        TARGET_FILED -> Constants.ROW_ROOT,
+        SCHEMA_DIR -> Paths.get(Common.pluginFilesDir("json").toString, "schemas").toString,
+        SCHEMA_FILE -> DEFAULT_SCHEMA_FILE))
     config = config.withFallback(defaultConfig)
-    val schemaFile = config.getString("schema_file")
+    val schemaFile = config.getString(SCHEMA_FILE)
     if (schemaFile.trim != "") {
-      parseCustomJsonSchema(env.getSparkSession, config.getString("schema_dir"), schemaFile)
+      parseCustomJsonSchema(env.getSparkSession, config.getString(SCHEMA_DIR), schemaFile)
     }
   }
 
@@ -136,5 +138,5 @@ class Json extends BaseSparkTransform {
     }
   }
 
-  override def getPluginName: String = "json"
+  override def getPluginName: String = PLUGIN_NAME
 }
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/JsonConfig.scala
similarity index 71%
copy from seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/JsonConfig.scala
index 5f78e00f..560fce0c 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/JsonConfig.scala
@@ -15,14 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.apis;
+package org.apache.seatunnel.spark.transform
 
-import org.apache.seatunnel.env.RuntimeEnv;
-import org.apache.seatunnel.plugin.Plugin;
-
-/**
- * a base interface indicates a transform plugin which will do transformations on data.
- */
-public interface BaseTransform<T extends RuntimeEnv> extends Plugin<T> {
+object JsonConfig {
+  val PLUGIN_NAME = "json"
 
+  val FIELDS = "fields"
+  val SOURCE_FILED = "source_field"
+  val DEFAULT_SOURCE_FILED = "raw_message"
+  val TARGET_FILED = "target_field"
+  val SCHEMA_DIR = "schema_dir"
+  val SCHEMA_FILE = "schema_file"
+  val DEFAULT_SCHEMA_FILE = ""
 }
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
index 609dec15..27e6cf9c 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
 import scala.collection.JavaConversions._
@@ -26,42 +27,43 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
+import org.apache.seatunnel.spark.transform.ReplaceConfig._
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions.{col, udf}
 
 class Replace extends BaseSparkTransform {
   override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
-    val srcField = config.getString("source_field")
-    val key = config.getString("fields")
+    val srcField = config.getString(SOURCE_FILED)
+    val key = config.getString(FIELDS)
 
     val func: UserDefinedFunction = udf((s: String) => {
       replace(
         s,
-        config.getString("pattern"),
-        config.getString("replacement"),
-        config.getBoolean("is_regex"),
-        config.getBoolean("replace_first"))
+        config.getString(PATTERN),
+        config.getString(REPLACEMENT),
+        config.getBoolean(REPLACE_REGEX),
+        config.getBoolean(REPLACE_FIRST))
     })
     var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
     filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP))
     val ds = filterDf.drop(Constants.ROW_TMP)
     if (func != null) {
-      env.getSparkSession.udf.register("Replace", func)
+      env.getSparkSession.udf.register(UDF_NAME, func)
     }
     ds
   }
 
   override def checkConfig(): CheckResult = {
-    checkAllExists(config, "fields", "pattern", "replacement")
+    checkAllExists(config, FIELDS, PATTERN, REPLACEMENT)
   }
 
   override def prepare(env: SparkEnvironment): Unit = {
     val defaultConfig = ConfigFactory.parseMap(
       Map(
-        "source_field" -> "raw_message",
-        "is_regex" -> false,
-        "replace_first" -> false))
+        SOURCE_FILED -> DEFAULT_SOURCE_FILED,
+        REPLACE_REGEX -> DEFAULT_REPLACE_REGEX,
+        REPLACE_FIRST -> DEFAULT_REPLACE_FIRST))
     config = config.withFallback(defaultConfig)
   }
 
@@ -83,4 +85,6 @@ class Replace extends BaseSparkTransform {
   }
 
   implicit def toReg(pattern: String): Regex = pattern.r
+
+  override def getPluginName: String = PLUGIN_NAME
 }
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/ReplaceConfig.scala
similarity index 65%
copy from seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/ReplaceConfig.scala
index 5f78e00f..51e15153 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/ReplaceConfig.scala
@@ -15,14 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.apis;
+package org.apache.seatunnel.spark.transform
 
-import org.apache.seatunnel.env.RuntimeEnv;
-import org.apache.seatunnel.plugin.Plugin;
-
-/**
- * a base interface indicates a transform plugin which will do transformations on data.
- */
-public interface BaseTransform<T extends RuntimeEnv> extends Plugin<T> {
+object ReplaceConfig {
+  val PLUGIN_NAME = "replace"
+  val UDF_NAME = "Replace"
 
+  val FIELDS = "fields"
+  val SOURCE_FILED = "source_field"
+  val DEFAULT_SOURCE_FILED = "raw_message"
+  val PATTERN = "pattern"
+  val REPLACEMENT = "replacement"
+  val REPLACE_REGEX = "is_regex"
+  val DEFAULT_REPLACE_REGEX = false
+  val REPLACE_FIRST = "replace_first"
+  val DEFAULT_REPLACE_FIRST = false
 }
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
index dddac46a..fdb4287d 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
 import junit.framework.TestCase.assertEquals
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
index 498776c0..764c5bab 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
 import scala.collection.JavaConversions._
@@ -23,6 +24,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
+import org.apache.seatunnel.spark.transform.SplitConfig._
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions.{col, udf}
@@ -30,15 +32,15 @@ import org.apache.spark.sql.functions.{col, udf}
 class Split extends BaseSparkTransform {
 
   override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
-    val srcField = config.getString("source_field")
-    val keys = config.getStringList("fields")
+    val srcField = config.getString(SOURCE_FILED)
+    val keys = config.getStringList(FIELDS)
 
     // https://stackoverflow.com/a/33345698/1145750
     var func: UserDefinedFunction = null
-    val ds = config.getString("target_field") match {
+    val ds = config.getString(TARGET_FILED) match {
       case Constants.ROW_ROOT =>
         func = udf((s: String) => {
-          split(s, config.getString("separator"), keys.size())
+          split(s, config.getString(SPLIT_SEPARATOR), keys.size())
         })
         var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
         for (i <- 0 until keys.size()) {
@@ -47,28 +49,28 @@ class Split extends BaseSparkTransform {
         filterDf.drop(Constants.ROW_TMP)
       case targetField: String =>
         func = udf((s: String) => {
-          val values = split(s, config.getString("separator"), keys.size)
+          val values = split(s, config.getString(SPLIT_SEPARATOR), keys.size)
           val kvs = (keys zip values).toMap
           kvs
         })
         df.withColumn(targetField, func(col(srcField)))
     }
     if (func != null) {
-      env.getSparkSession.udf.register("Split", func)
+      env.getSparkSession.udf.register(UDF_NAME, func)
     }
     ds
   }
 
   override def checkConfig(): CheckResult = {
-    checkAllExists(config, "fields")
+    checkAllExists(config, FIELDS)
   }
 
   override def prepare(env: SparkEnvironment): Unit = {
     val defaultConfig = ConfigFactory.parseMap(
       Map(
-        "separator" -> " ",
-        "source_field" -> "raw_message",
-        "target_field" -> Constants.ROW_ROOT))
+        SPLIT_SEPARATOR -> DEFAULT_SPLIT_SEPARATOR,
+        SOURCE_FILED -> DEFAULT_SOURCE_FILED,
+        TARGET_FILED -> Constants.ROW_ROOT))
     config = config.withFallback(defaultConfig)
   }
 
@@ -86,5 +88,5 @@ class Split extends BaseSparkTransform {
     filled.toSeq
   }
 
-  override def getPluginName: String = "split"
+  override def getPluginName: String = PLUGIN_NAME
 }
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/SplitConfig.scala
similarity index 70%
copy from seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/SplitConfig.scala
index 5f78e00f..b23036ab 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/SplitConfig.scala
@@ -15,14 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.apis;
+package org.apache.seatunnel.spark.transform
 
-import org.apache.seatunnel.env.RuntimeEnv;
-import org.apache.seatunnel.plugin.Plugin;
-
-/**
- * a base interface indicates a transform plugin which will do transformations on data.
- */
-public interface BaseTransform<T extends RuntimeEnv> extends Plugin<T> {
+object SplitConfig {
+  val PLUGIN_NAME = "split"
+  val UDF_NAME = "Split"
 
+  val FIELDS = "fields"
+  val SOURCE_FILED = "source_field"
+  val DEFAULT_SOURCE_FILED = "raw_message"
+  val TARGET_FILED = "target_field"
+  val SPLIT_SEPARATOR = "separator"
+  val DEFAULT_SPLIT_SEPARATOR = " "
 }
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
index 2881b1a2..b15a4dd5 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
diff --git a/seatunnel-core/seatunnel-core-base/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml
similarity index 81%
copy from seatunnel-core/seatunnel-core-base/pom.xml
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml
index 11e6f2e7..f9185ef4 100644
--- a/seatunnel-core/seatunnel-core-base/pom.xml
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml
@@ -22,31 +22,29 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-core</artifactId>
+        <artifactId>seatunnel-transforms-spark</artifactId>
         <version>${revision}</version>
-        <relativePath>../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-core-base</artifactId>
+    <artifactId>seatunnel-transform-spark-uuid</artifactId>
 
     <dependencies>
-
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-api-spark</artifactId>
             <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-api-flink</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
         </dependency>
 
         <dependency>
-            <groupId>com.beust</groupId>
-            <artifactId>jcommander</artifactId>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
         </dependency>
 
         <dependency>
@@ -54,5 +52,4 @@
             <artifactId>junit</artifactId>
         </dependency>
     </dependencies>
-
 </project>
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform
old mode 100755
new mode 100644
similarity index 55%
copy from seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform
index 95160a6b..281a7242
--- a/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform
@@ -1,4 +1,3 @@
-#!/bin/bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -7,7 +6,7 @@
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
 #
-#    http://www.apache.org/licenses/LICENSE-2.0
+#     http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,24 +15,4 @@
 # limitations under the License.
 #
 
-set -eu
-APP_DIR=$(cd $(dirname ${0})/../;pwd)
-CONF_DIR=${APP_DIR}/config
-APP_JAR=${APP_DIR}/lib/seatunnel-core-flink.jar
-
-if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
-    . "${CONF_DIR}/seatunnel-env.sh"
-fi
-
-CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.FlinkStarter ${@}) && EXIT_CODE=$? || EXIT_CODE=$?
-if [ ${EXIT_CODE} -eq 234 ]; then
-    # print usage
-    echo ${CMD}
-    exit 0
-elif [ ${EXIT_CODE} -eq 0 ]; then
-    echo "Execute SeaTunnel Flink Job: ${CMD}"
-    eval ${CMD}
-else
-    echo ${CMD}
-    exit ${EXIT_CODE}
-fi
+org.apache.seatunnel.spark.transform.UUID
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala
similarity index 54%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala
index 609dec15..829df588 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala
@@ -14,73 +14,76 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
+import java.security.SecureRandom
+
 import scala.collection.JavaConversions._
-import scala.util.matching.Regex
 
 import com.google.common.annotations.VisibleForTesting
-import org.apache.commons.lang3.StringUtils
+import org.apache.commons.math3.random.{RandomGenerator, Well19937c}
 import org.apache.seatunnel.common.Constants
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
+import org.apache.seatunnel.spark.transform.UUIDConfig._
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions.{col, udf}
 
-class Replace extends BaseSparkTransform {
+class UUID extends BaseSparkTransform {
+  private var prng: RandomGenerator = _
+
   override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
-    val srcField = config.getString("source_field")
-    val key = config.getString("fields")
+    val key = config.getString(FIELDS)
 
-    val func: UserDefinedFunction = udf((s: String) => {
-      replace(
-        s,
-        config.getString("pattern"),
-        config.getString("replacement"),
-        config.getBoolean("is_regex"),
-        config.getBoolean("replace_first"))
+    val func: UserDefinedFunction = udf(() => {
+      generate(config.getString(UUID_PREFIX))
     })
-    var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
+    var filterDf = df.withColumn(Constants.ROW_TMP, func())
     filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP))
     val ds = filterDf.drop(Constants.ROW_TMP)
     if (func != null) {
-      env.getSparkSession.udf.register("Replace", func)
+      env.getSparkSession.udf.register(UDF_NAME, func)
     }
     ds
   }
 
   override def checkConfig(): CheckResult = {
-    checkAllExists(config, "fields", "pattern", "replacement")
+    checkAllExists(config, FIELDS)
   }
 
   override def prepare(env: SparkEnvironment): Unit = {
-    val defaultConfig = ConfigFactory.parseMap(
-      Map(
-        "source_field" -> "raw_message",
-        "is_regex" -> false,
-        "replace_first" -> false))
+    val defaultConfig = ConfigFactory.parseMap(Map(UUID_PREFIX -> DEFAULT_UUID_PREFIX, UUID_SECURE -> DEFAULT_UUID_SECURE))
     config = config.withFallback(defaultConfig)
+
+    /**
+     * The secure algorithm can be comparatively slow.
+     * The new nonSecure algorithm never blocks and is much faster.
+     * The nonSecure algorithm uses a secure random seed but is otherwise deterministic,
+     * though it is one of the strongest uniform pseudo-random number generators known so far.
+     * thanks for whoschek@cloudera.com
+     */
+    if (config.getBoolean(UUID_SECURE)) {
+      val rand = new SecureRandom
+      val seed = for (_ <- 0 until 728) yield rand.nextInt
+      prng = new Well19937c(seed.toArray)
+    }
   }
 
   @VisibleForTesting
-  def replace(
-      str: String,
-      pattern: String,
-      replacement: String,
-      isRegex: Boolean,
-      replaceFirst: Boolean): String = {
+  def generate(prefix: String): String = {
+    val UUID = if (prng == null) java.util.UUID.randomUUID else new java.util.UUID(prng.nextLong, prng.nextLong)
+    prefix + UUID
+  }
 
-    if (isRegex) {
-      if (replaceFirst) pattern.replaceFirstIn(str, replacement)
-      else pattern.replaceAllIn(str, replacement)
-    } else {
-      val max = if (replaceFirst) 1 else -1
-      StringUtils.replace(str, pattern, replacement, max)
-    }
+  // Only used for test
+  @VisibleForTesting
+  def setPrng(prng: RandomGenerator): Unit = {
+    this.prng = prng
   }
 
-  implicit def toReg(pattern: String): Regex = pattern.r
+  override def getPluginName: String = PLUGIN_NAME
 }
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUIDConfig.scala
similarity index 72%
rename from seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java
rename to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUIDConfig.scala
index 5f78e00f..4cf1aeae 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/apis/BaseTransform.java
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUIDConfig.scala
@@ -15,14 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.apis;
+package org.apache.seatunnel.spark.transform
 
-import org.apache.seatunnel.env.RuntimeEnv;
-import org.apache.seatunnel.plugin.Plugin;
-
-/**
- * a base interface indicates a transform plugin which will do transformations on data.
- */
-public interface BaseTransform<T extends RuntimeEnv> extends Plugin<T> {
+object UUIDConfig {
+  val PLUGIN_NAME = "UUID"
+  val UDF_NAME = PLUGIN_NAME
 
+  val FIELDS = "fields"
+  val DEFAULT_SOURCE_FILED = "raw_message"
+  val UUID_PREFIX = "prefix"
+  val DEFAULT_UUID_PREFIX = ""
+  val UUID_SECURE = "secure"
+  val DEFAULT_UUID_SECURE = false
 }
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala
similarity index 62%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala
index dddac46a..7d375118 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala
@@ -14,27 +14,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
+import java.security.SecureRandom
+
 import junit.framework.TestCase.assertEquals
+import org.apache.commons.math3.random.Well19937c
 import org.junit.Test
 
-class TestReplace {
+class TestUUID {
   @Test
-  def testReplaceReg() {
-    val udf = new Replace
-    assertEquals(
-      "world",
-      udf.replace("hello world", "([^ ]*) ([^ ]*)", "$2", isRegex = true, replaceFirst = false))
-    assertEquals(
-      "hello world",
-      udf.replace("hello world", "([^ ]*)", "$1", isRegex = true, replaceFirst = true))
+  def testUuid() {
+    val UUID = new UUID
+    assertEquals(36, UUID.generate("").length)
+    assertEquals(37, UUID.generate("u").length)
   }
 
   @Test
-  def testReplaceLiteral() {
-    val udf = new Replace
-    assertEquals("fee", udf.replace("foo", "o", "e", isRegex = false, replaceFirst = false))
-    assertEquals("feo", udf.replace("foo", "o", "e", isRegex = false, replaceFirst = true))
+  def testSecureUuid() {
+    val rand = new SecureRandom
+    val seed = for (_ <- 0 until 728) yield rand.nextInt
+    val prng = new Well19937c(seed.toArray)
+
+    val UUID = new UUID
+    UUID.setPrng(prng)
+    assertEquals(36, UUID.generate("").length)
+    assertEquals(37, UUID.generate("u").length)
   }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
index 144b84ad..e5026108 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java
@@ -27,6 +27,7 @@ public class FlinkRowSerialization implements RowSerialization<Row> {
 
     @Override
     public Row serialize(org.apache.seatunnel.api.table.type.Row seaTunnelRow) throws IOException {
+
         return null;
     }
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
new file mode 100644
index 00000000..f07fa71c
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
@@ -0,0 +1,16 @@
+package org.apache.seatunnel.translation.flink.types;
+
+import org.apache.seatunnel.api.table.type.DataType;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * Convert SeaTunnel {@link DataType} to flink type.
+ *
+ * @param <T> target flink type
+ */
+public interface FlinkTypeConverter<T> {
+
+    TypeInformation<T> convert(DataType dataType);
+
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/StringConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/StringConverter.java
new file mode 100644
index 00000000..3b26491f
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/StringConverter.java
@@ -0,0 +1,14 @@
+package org.apache.seatunnel.translation.flink.types;
+
+import org.apache.seatunnel.api.table.type.DataType;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+public class StringConverter implements FlinkTypeConverter<String> {
+
+    @Override
+    public TypeInformation<String> convert(DataType dataType) {
+
+        return null;
+    }
+}