You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/05/16 03:55:37 UTC

[incubator-seatunnel-web] branch add_canvas_job_define updated: Add datasource config switcher (#55)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch add_canvas_job_define
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel-web.git


The following commit(s) were added to refs/heads/add_canvas_job_define by this push:
     new e0b6e75d Add datasource config switcher (#55)
e0b6e75d is described below

commit e0b6e75d45fe6246843d8f50fe9aff77fb30371d
Author: Eric <ga...@gmail.com>
AuthorDate: Tue May 16 11:55:31 2023 +0800

    Add datasource config switcher (#55)
---
 .licenserc.yaml                                    |   1 +
 pom.xml                                            | 194 ++++++++-
 .../datasource-s3/pom.xml                          |  21 -
 .../seatunnel-datasource-plugins/pom.xml           |  15 +
 seatunnel-server/pom.xml                           | 116 -----
 seatunnel-server/seatunnel-app/pom.xml             |  43 +-
 .../app/common/SeaTunnelConnectorI18n.java         |  44 ++
 .../app/domain/request/connector/BusinessMode.java |  24 +
 .../app/domain/request/job/DataSourceOption.java   |  34 ++
 .../app/domain/request/job/SelectTableFields.java  |  35 ++
 .../seatunnel/app/domain/response/BaseInfo.java    |  34 ++
 .../response/connector/ConnectorFeature.java       |  28 ++
 .../domain/response/connector/ConnectorInfo.java   |  30 ++
 .../domain/response/connector/DataSourceInfo.java  |  30 ++
 .../response/connector/DataSourceInstance.java     |  32 ++
 .../response/datasource/DatasourceDetailRes.java   |  48 ++
 .../domain/response/datasource/DatasourceRes.java  |  44 ++
 .../response/datasource/VirtualTableDetailRes.java |  58 +++
 .../response/datasource/VirtualTableFieldRes.java  |  54 +++
 .../response/datasource/VirtualTableRes.java       |  44 ++
 .../AbstractDataSourceConfigSwitcher.java          | 163 +++++++
 .../datasource/DataSourceClientFactory.java        |  39 ++
 .../datasource/DataSourceConfigSwitcher.java       |  57 +++
 .../datasource/DataSourceConfigSwitcherUtils.java  | 135 ++++++
 .../app/thridparty/datasource/SchemaGenerator.java |  90 ++++
 .../impl/BaseJdbcDataSourceConfigSwitcher.java     | 186 ++++++++
 .../impl/ClickhouseDataSourceConfigSwitcher.java   | 152 +++++++
 .../ElasticSearchDataSourceConfigSwitcher.java     | 125 ++++++
 .../impl/KafkaDataSourceConfigSwitcher.java        | 129 ++++++
 .../impl/MysqlCDCDataSourceConfigSwitcher.java     | 179 ++++++++
 .../impl/MysqlDatasourceConfigSwitcher.java        |  25 ++
 .../impl/OracleDataSourceConfigSwitcher.java       |  67 +++
 .../impl/PostgresCDCDataSourceConfigSwitcher.java  | 185 ++++++++
 .../impl/PostgresqlDataSourceConfigSwitcher.java   |  60 +++
 .../impl/RedshiftDataSourceConfigSwitcher.java     |  58 +++
 .../impl/S3DataSourceConfigSwitcher.java           | 185 ++++++++
 .../impl/S3RedshiftDataSourceConfigSwitcher.java   |  84 ++++
 .../impl/SqlServerCDCDataSourceConfigSwitcher.java | 185 ++++++++
 .../impl/SqlServerDataSourceConfigSwitcher.java    |  33 ++
 .../impl/StarRocksDataSourceConfigSwitcher.java    | 109 +++++
 .../impl/TidbDataSourceConfigSwitcher.java         |  82 ++++
 .../exceptions/UnSupportWrapperException.java      |  30 ++
 .../app/thridparty/framework/FormOptionSort.java   |  67 +++
 .../thridparty/framework/PluginDiscoveryUtil.java  | 120 +++++
 .../framework/SeaTunnelOptionRuleWrapper.java      | 484 +++++++++++++++++++++
 .../framework/UnSupportWrapperException.java       |  30 ++
 .../org/apache/seatunnel/app/utils/JdbcUtils.java  |  59 +++
 seatunnel-web-dist/release-docs/LICENSE            |   1 +
 .../src/main/assembly/seatunnel-web-ci.xml         |  13 +
 .../src/main/assembly/seatunnel-web.xml            |  13 +
 tools/dependencies/known-dependencies.txt          |   4 +
 51 files changed, 3929 insertions(+), 149 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index ec23eba6..052f0bc1 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -41,5 +41,6 @@ header:
     - '**/.gitkeep'
     - '**/com/typesafe/config/**'
     - 'seatunnel-main-repository/**'
+    - 'seatunnel-web-dist/release-docs/**'
 
   comment: on-failure
diff --git a/pom.xml b/pom.xml
index b2a4da10..c015a38d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,7 +106,8 @@
         <!-- dependency -->
         <commons.logging.version>1.2</commons.logging.version>
         <slf4j.version>1.7.25</slf4j.version>
-        <jackson.version>2.12.7.1</jackson.version>
+        <jackson-databind.version>2.12.7.1</jackson-databind.version>
+        <jackson.version>2.12.7</jackson.version>
         <mysql.version>8.0.16</mysql.version>
         <h2.version>2.1.214</h2.version>
         <junit.version>5.9.0</junit.version>
@@ -125,6 +126,23 @@
         <e2e.dependency.skip>true</e2e.dependency.skip>
         <maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
         <flatten-maven-plugin.version>1.3.0</flatten-maven-plugin.version>
+
+        <spring-boot.version>2.6.8</spring-boot.version>
+        <spring.version>5.3.20</spring.version>
+        <mybatis-plus-boot-starter.version>3.5.3.1</mybatis-plus-boot-starter.version>
+        <druid-spring-boot-starter.version>1.2.9</druid-spring-boot-starter.version>
+        <springfox-swagger.version>2.6.1</springfox-swagger.version>
+        <swagger-annotations.version>1.5.10</swagger-annotations.version>
+        <hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
+        <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
+        <jsoup.version>1.14.3</jsoup.version>
+        <jwt.version>0.10.7</jwt.version>
+        <cron-utils.version>9.1.6</cron-utils.version>
+        <commons-io.version>2.11.0</commons-io.version>
+
+        <hadoop-uber.version>2.3.1</hadoop-uber.version>
+        <hadoop-aws.version>3.1.4</hadoop-aws.version>
+        <aws-java-sdk-bundle.version>1.11.271</aws-java-sdk-bundle.version>
     </properties>
 
     <dependencyManagement>
@@ -240,6 +258,28 @@
                 <groupId>org.apache.seatunnel</groupId>
                 <artifactId>seatunnel-plugin-discovery</artifactId>
                 <version>${seatunnel-framework.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <artifactId>jcl-over-slf4j</artifactId>
+                        <groupId>org.slf4j</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>log4j-1.2-api</artifactId>
+                        <groupId>org.apache.logging.log4j</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>log4j-api</artifactId>
+                        <groupId>org.apache.logging.log4j</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>log4j-core</artifactId>
+                        <groupId>org.apache.logging.log4j</groupId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>log4j-slf4j-impl</artifactId>
+                        <groupId>org.apache.logging.log4j</groupId>
+                    </exclusion>
+                </exclusions>
             </dependency>
 
             <dependency>
@@ -255,6 +295,109 @@
                 <scope>provided</scope>
             </dependency>
 
+            <!-- springboot and other dependencies -->
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-starter-web</artifactId>
+                <version>${spring-boot.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-starter-jetty</artifactId>
+                <version>${spring-boot.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-starter-aop</artifactId>
+                <version>${spring-boot.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-starter-jdbc</artifactId>
+                <version>${spring-boot.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-starter-test</artifactId>
+                <version>${spring-boot.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>com.google.code.gson</groupId>
+                <artifactId>gson</artifactId>
+                <version>2.8.6</version>
+            </dependency>
+            <dependency>
+                <groupId>com.alibaba</groupId>
+                <artifactId>druid-spring-boot-starter</artifactId>
+                <version>${druid-spring-boot-starter.version}</version>
+            </dependency>
+
+            <!-- ORM -->
+            <dependency>
+                <groupId>com.baomidou</groupId>
+                <artifactId>mybatis-plus-boot-starter</artifactId>
+                <version>${mybatis-plus-boot-starter.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.springframework.boot</groupId>
+                        <artifactId>spring-boot-starter-jdbc</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <artifactId>spring-boot-autoconfigure</artifactId>
+                        <groupId>org.springframework.boot</groupId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>org.hibernate.validator</groupId>
+                <artifactId>hibernate-validator</artifactId>
+                <version>${hibernate.validator.version}</version>
+            </dependency>
+
+            <!-- swagger -->
+            <dependency>
+                <groupId>io.springfox</groupId>
+                <artifactId>springfox-swagger2</artifactId>
+                <version>${springfox-swagger.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.springfox</groupId>
+                <artifactId>springfox-swagger-ui</artifactId>
+                <version>${springfox-swagger.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.swagger</groupId>
+                <artifactId>swagger-annotations</artifactId>
+                <version>${swagger-annotations.version}</version>
+            </dependency>
+
+            <!-- JWT -->
+            <dependency>
+                <groupId>io.jsonwebtoken</groupId>
+                <artifactId>jjwt-api</artifactId>
+                <version>${jwt.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.jsonwebtoken</groupId>
+                <artifactId>jjwt-impl</artifactId>
+                <version>${jwt.version}</version>
+                <scope>runtime</scope>
+            </dependency>
+            <dependency>
+                <groupId>io.jsonwebtoken</groupId>
+                <artifactId>jjwt-jackson</artifactId>
+                <version>${jwt.version}</version>
+                <scope>runtime</scope>
+            </dependency>
+
+            <!--      http      -->
+            <dependency>
+                <groupId>org.jsoup</groupId>
+                <artifactId>jsoup</artifactId>
+                <version>${jsoup.version}</version>
+            </dependency>
+
             <!-- seatunnel connector test -->
             <dependency>
                 <groupId>org.apache.seatunnel</groupId>
@@ -296,6 +439,11 @@
                 <artifactId>commons-lang3</artifactId>
                 <version>${commons-lang3.version}</version>
             </dependency>
+            <dependency>
+                <groupId>commons-io</groupId>
+                <artifactId>commons-io</artifactId>
+                <version>${commons-io.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-collections4</artifactId>
@@ -312,7 +460,7 @@
             <dependency>
                 <groupId>com.fasterxml.jackson.core</groupId>
                 <artifactId>jackson-databind</artifactId>
-                <version>${jackson.version}</version>
+                <version>${jackson-databind.version}</version>
             </dependency>
 
             <dependency>
@@ -350,6 +498,48 @@
                 <version>${awaitility.version}</version>
                 <scope>test</scope>
             </dependency>
+
+            <dependency>
+                <groupId>org.apache.seatunnel</groupId>
+                <artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
+                <scope>provided</scope>
+                <version>${hadoop-uber.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.avro</groupId>
+                        <artifactId>avro</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-aws</artifactId>
+                <version>${hadoop-aws.version}</version>
+                <scope>provided</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>jdk.tools</groupId>
+                        <artifactId>jdk.tools</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>com.amazonaws</groupId>
+                <artifactId>aws-java-sdk-bundle</artifactId>
+                <version>${aws-java-sdk-bundle.version}</version>
+                <scope>provided</scope>
+            </dependency>
+            <dependency>
+                <groupId>com.cronutils</groupId>
+                <artifactId>cron-utils</artifactId>
+                <version>${cron-utils.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <artifactId>javassist</artifactId>
+                        <groupId>org.javassist</groupId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
         </dependencies>
 
     </dependencyManagement>
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-s3/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-s3/pom.xml
index be117f58..59b39e58 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-s3/pom.xml
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-s3/pom.xml
@@ -24,12 +24,6 @@
 
     <artifactId>datasource-s3</artifactId>
 
-    <properties>
-        <hadoop-uber.version>2.4.4-WS-SNAPSHOT</hadoop-uber.version>
-        <hadoop-aws.version>3.1.4</hadoop-aws.version>
-        <aws-java-sdk-bundle.version>1.11.271</aws-java-sdk-bundle.version>
-    </properties>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
@@ -39,29 +33,14 @@
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
-            <version>${hadoop-uber.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.avro</groupId>
-                    <artifactId>avro</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-aws</artifactId>
-            <version>${hadoop-aws.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>jdk.tools</groupId>
-                    <artifactId>jdk.tools</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.amazonaws</groupId>
             <artifactId>aws-java-sdk-bundle</artifactId>
-            <version>${aws-java-sdk-bundle.version}</version>
         </dependency>
     </dependencies>
 
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
index 039df6b4..91cdc740 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
@@ -1,4 +1,19 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
diff --git a/seatunnel-server/pom.xml b/seatunnel-server/pom.xml
index 7f1e1def..7bfbf9d6 100644
--- a/seatunnel-server/pom.xml
+++ b/seatunnel-server/pom.xml
@@ -32,120 +32,4 @@
         <module>seatunnel-scheduler</module>
         <module>seatunnel-server-common</module>
     </modules>
-
-    <properties>
-        <spring-boot.version>2.6.8</spring-boot.version>
-        <spring.version>5.3.20</spring.version>
-        <mybatis-plus-boot-starter.version>3.5.3.1</mybatis-plus-boot-starter.version>
-        <druid-spring-boot-starter.version>1.2.9</druid-spring-boot-starter.version>
-        <springfox-swagger.version>2.6.1</springfox-swagger.version>
-        <swagger-annotations.version>1.5.10</swagger-annotations.version>
-        <hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
-        <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
-        <jsoup.version>1.14.3</jsoup.version>
-        <jwt.version>0.10.7</jwt.version>
-        <cron-utils.version>9.1.6</cron-utils.version>
-    </properties>
-
-    <dependencyManagement>
-        <dependencies>
-            <!-- springboot -->
-            <dependency>
-                <groupId>org.springframework.boot</groupId>
-                <artifactId>spring-boot-starter-web</artifactId>
-                <version>${spring-boot.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>org.springframework.boot</groupId>
-                <artifactId>spring-boot-starter-jetty</artifactId>
-                <version>${spring-boot.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>org.springframework.boot</groupId>
-                <artifactId>spring-boot-starter-aop</artifactId>
-                <version>${spring-boot.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>org.springframework.boot</groupId>
-                <artifactId>spring-boot-starter-jdbc</artifactId>
-                <version>${spring-boot.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>com.google.code.gson</groupId>
-                <artifactId>gson</artifactId>
-                <version>2.8.6</version>
-            </dependency>
-            <dependency>
-                <groupId>com.alibaba</groupId>
-                <artifactId>druid-spring-boot-starter</artifactId>
-                <version>${druid-spring-boot-starter.version}</version>
-            </dependency>
-
-            <!-- ORM -->
-            <dependency>
-                <groupId>com.baomidou</groupId>
-                <artifactId>mybatis-plus-boot-starter</artifactId>
-                <version>${mybatis-plus-boot-starter.version}</version>
-                <exclusions>
-                    <exclusion>
-                        <groupId>org.springframework.boot</groupId>
-                        <artifactId>spring-boot-starter-jdbc</artifactId>
-                    </exclusion>
-                    <exclusion>
-                        <artifactId>spring-boot-autoconfigure</artifactId>
-                        <groupId>org.springframework.boot</groupId>
-                    </exclusion>
-                </exclusions>
-            </dependency>
-            <dependency>
-                <groupId>org.hibernate.validator</groupId>
-                <artifactId>hibernate-validator</artifactId>
-                <version>${hibernate.validator.version}</version>
-            </dependency>
-
-            <!-- swagger -->
-            <dependency>
-                <groupId>io.springfox</groupId>
-                <artifactId>springfox-swagger2</artifactId>
-                <version>${springfox-swagger.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>io.springfox</groupId>
-                <artifactId>springfox-swagger-ui</artifactId>
-                <version>${springfox-swagger.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>io.swagger</groupId>
-                <artifactId>swagger-annotations</artifactId>
-                <version>${swagger-annotations.version}</version>
-            </dependency>
-
-            <!-- JWT -->
-            <dependency>
-                <groupId>io.jsonwebtoken</groupId>
-                <artifactId>jjwt-api</artifactId>
-                <version>${jwt.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>io.jsonwebtoken</groupId>
-                <artifactId>jjwt-impl</artifactId>
-                <version>${jwt.version}</version>
-                <scope>runtime</scope>
-            </dependency>
-            <dependency>
-                <groupId>io.jsonwebtoken</groupId>
-                <artifactId>jjwt-jackson</artifactId>
-                <version>${jwt.version}</version>
-                <scope>runtime</scope>
-            </dependency>
-
-            <!--      http      -->
-            <dependency>
-                <groupId>org.jsoup</groupId>
-                <artifactId>jsoup</artifactId>
-                <version>${jsoup.version}</version>
-            </dependency>
-        </dependencies>
-    </dependencyManagement>
-
 </project>
\ No newline at end of file
diff --git a/seatunnel-server/seatunnel-app/pom.xml b/seatunnel-server/seatunnel-app/pom.xml
index 7458e589..ca908b06 100644
--- a/seatunnel-server/seatunnel-app/pom.xml
+++ b/seatunnel-server/seatunnel-app/pom.xml
@@ -30,11 +30,19 @@
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-common</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-dynamicform</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-plugin-discovery</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-scheduler-dolphinscheduler</artifactId>
@@ -45,6 +53,26 @@
             <artifactId>seatunnel-server-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <!-- datasource -->
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-datasource-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>datasource-plugins-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>datasource-s3</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!--springboot-->
         <dependency>
             <groupId>org.springframework.boot</groupId>
@@ -118,13 +146,11 @@
         <dependency>
             <groupId>io.springfox</groupId>
             <artifactId>springfox-swagger-ui</artifactId>
-            <version>${springfox-swagger.version}</version>
         </dependency>
 
         <dependency>
             <groupId>io.swagger</groupId>
             <artifactId>swagger-annotations</artifactId>
-            <version>${swagger-annotations.version}</version>
         </dependency>
 
         <dependency>
@@ -183,7 +209,6 @@
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-test</artifactId>
-            <version>${spring-boot.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
@@ -204,17 +229,15 @@
             <artifactId>spring-boot-starter-aop</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+
         <!-- https://mvnrepository.com/artifact/com.cronutils/cron-utils -->
         <dependency>
             <groupId>com.cronutils</groupId>
             <artifactId>cron-utils</artifactId>
-            <version>${cron-utils.version}</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>javassist</artifactId>
-                    <groupId>org.javassist</groupId>
-                </exclusion>
-            </exclusions>
         </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/SeaTunnelConnectorI18n.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/SeaTunnelConnectorI18n.java
new file mode 100644
index 00000000..96e4780b
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/SeaTunnelConnectorI18n.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.common;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.commons.io.IOUtils;
+
+import java.nio.charset.StandardCharsets;
+
+public class SeaTunnelConnectorI18n {
+    public static Config CONNECTOR_I18N_CONFIG_EN;
+    public static Config CONNECTOR_I18N_CONFIG_ZH;
+
+    static {
+        try {
+            CONNECTOR_I18N_CONFIG_EN = ConfigFactory.parseString(IOUtils.toString(SeaTunnelConnectorI18n.class.getResourceAsStream("/i18n_en.config"), StandardCharsets.UTF_8));
+            CONNECTOR_I18N_CONFIG_ZH =
+                ConfigFactory.parseString(
+                    IOUtils.toString(
+                        SeaTunnelConnectorI18n.class.getResourceAsStream(
+                            "/i18n_zh.config"),
+                        StandardCharsets.UTF_8));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/connector/BusinessMode.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/connector/BusinessMode.java
new file mode 100644
index 00000000..73c906c5
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/connector/BusinessMode.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.request.connector;
+
+public enum BusinessMode {
+    DATA_REPLICA,
+
+    DATA_INTEGRATION;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/DataSourceOption.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/DataSourceOption.java
new file mode 100644
index 00000000..0cc8ebd9
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/DataSourceOption.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.request.job;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class DataSourceOption {
+
+    private List<String> databases;
+
+    private List<String> tables;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/SelectTableFields.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/SelectTableFields.java
new file mode 100644
index 00000000..6c4cd0a0
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/SelectTableFields.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.request.job;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SelectTableFields {
+
+    private boolean all;
+    private List<String> tableFields;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/BaseInfo.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/BaseInfo.java
new file mode 100644
index 00000000..3fd2dd60
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/BaseInfo.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response;
+
+import lombok.Data;
+
+import java.util.Date;
+
+@Data
+public class BaseInfo {
+
+    private String createUserName;
+
+    private Date createTime;
+
+    private String updateUserName;
+
+    private Date updateTime;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/ConnectorFeature.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/ConnectorFeature.java
new file mode 100644
index 00000000..a77e95d3
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/ConnectorFeature.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.connector;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class ConnectorFeature {
+
+    private boolean supportColumnProjection;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/ConnectorInfo.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/ConnectorInfo.java
new file mode 100644
index 00000000..615b2bf0
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/ConnectorInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.connector;
+
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class ConnectorInfo {
+    private PluginIdentifier pluginIdentifier;
+    private String artifactId;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/DataSourceInfo.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/DataSourceInfo.java
new file mode 100644
index 00000000..c8cdc912
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/DataSourceInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.connector;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class DataSourceInfo {
+
+    private ConnectorInfo connectorInfo;
+
+    private String datasourceName;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/DataSourceInstance.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/DataSourceInstance.java
new file mode 100644
index 00000000..963056dc
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/DataSourceInstance.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.connector;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class DataSourceInstance {
+
+    private DataSourceInfo dataSourceInfo;
+
+    private String dataSourceInstanceName;
+
+    private Long dataSourceInstanceId;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/DatasourceDetailRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/DatasourceDetailRes.java
new file mode 100644
index 00000000..4298f4bc
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/DatasourceDetailRes.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.datasource;
+
+import lombok.Data;
+
+import java.util.Date;
+import java.util.Map;
+
+@Data
+public class DatasourceDetailRes {
+
+    private String id;
+
+    private String datasourceName;
+
+    private String pluginName;
+
+    private String pluginVersion;
+
+    // todo check configuration
+    private Map<String, String> datasourceConfig;
+
+    private String description;
+
+    private String createUserName;
+
+    private String updateUserName;
+
+    private Date createTime;
+
+    private Date updateTime;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/DatasourceRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/DatasourceRes.java
new file mode 100644
index 00000000..b702a124
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/DatasourceRes.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.datasource;
+
+import org.apache.seatunnel.app.domain.response.BaseInfo;
+
+import lombok.Data;
+
+import java.util.Map;
+
+@Data
+public class DatasourceRes extends BaseInfo {
+
+    private String id;
+
+    private String datasourceName;
+
+    private String pluginName;
+
+    private String pluginVersion;
+
+    private String description;
+
+    private Map<String, String> datasourceConfig;
+
+    private int createUserId;
+
+    private int updateUserId;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableDetailRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableDetailRes.java
new file mode 100644
index 00000000..dba9dc6f
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableDetailRes.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.datasource;
+
+import org.apache.seatunnel.app.domain.response.BaseInfo;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+@ApiModel(value = "VirtualTable Detail Response", description = "virtual table detail info")
+public class VirtualTableDetailRes extends BaseInfo {
+
+    @ApiModelProperty(value = "table id", required = true, dataType = "String")
+    private String tableId;
+
+    @ApiModelProperty(value = "datasource id", required = true, dataType = "String")
+    private String datasourceId;
+
+    @ApiModelProperty(value = "datasource name", required = true, dataType = "String")
+    private String datasourceName;
+
+    @ApiModelProperty(value = "database name", required = true, dataType = "String")
+    private String databaseName;
+
+    @ApiModelProperty(value = "plugin name", required = true, dataType = "String")
+    private String pluginName;
+
+    @ApiModelProperty(value = "table name", required = true, dataType = "String")
+    private String tableName;
+
+    @ApiModelProperty(value = "table description", dataType = "String")
+    private String description;
+
+    @ApiModelProperty(value = "table properties", dataType = "List")
+    private List<VirtualTableFieldRes> fields;
+
+    private Map<String, String> datasourceProperties;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableFieldRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableFieldRes.java
new file mode 100644
index 00000000..09342dec
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableFieldRes.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.datasource;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@ApiModel(value = "VirtualTable Field Response", description = "virtual table field info")
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class VirtualTableFieldRes {
+
+    @ApiModelProperty(value = "field name", required = true, dataType = "String")
+    private String fieldName;
+
+    @ApiModelProperty(value = "field type", required = true, dataType = "String")
+    private String fieldType;
+
+    @ApiModelProperty(value = "field length", required = true, dataType = "Boolean")
+    private Boolean nullable;
+
+    @ApiModelProperty(value = "field default value", dataType = "String")
+    private String defaultValue;
+
+    @ApiModelProperty(value = "primary key", dataType = "Boolean")
+    private Boolean primaryKey;
+
+    @ApiModelProperty(value = "field comment", dataType = "String")
+    private String fieldComment;
+
+    @ApiModelProperty(value = "field extra", dataType = "String")
+    private String fieldExtra;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableRes.java
new file mode 100644
index 00000000..9d56bfbf
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableRes.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.datasource;
+
+import org.apache.seatunnel.app.domain.response.BaseInfo;
+
+import lombok.Data;
+
+@Data
+public class VirtualTableRes extends BaseInfo {
+
+    private String tableId;
+
+    private String datasourceId;
+
+    private String datasourceName;
+
+    private String databaseName;
+
+    private String tableName;
+
+    private String description;
+
+    private String pluginName;
+
+    private int createUserId;
+
+    private int updateUserId;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/AbstractDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/AbstractDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..576b55a8
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/AbstractDataSourceConfigSwitcher.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.exceptions.UnSupportWrapperException;
+import org.apache.seatunnel.app.thridparty.framework.SeaTunnelOptionRuleWrapper;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class AbstractDataSourceConfigSwitcher implements DataSourceConfigSwitcher {
+    @Override
+    public FormStructure filterOptionRule(
+        String connectorName,
+        OptionRule dataSourceOptionRule,
+        OptionRule virtualTableOptionRule,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        OptionRule connectorOptionRule,
+        List<String> excludedKeys) {
+
+        List<String> dataSourceRequiredAllKey =
+            Stream.concat(
+                    dataSourceOptionRule.getRequiredOptions().stream(),
+                    virtualTableOptionRule.getRequiredOptions().stream())
+                .flatMap(ro -> ro.getOptions().stream().map(Option::key))
+                .collect(Collectors.toList());
+
+        dataSourceRequiredAllKey.addAll(excludedKeys);
+
+        List<RequiredOption> requiredOptions =
+            connectorOptionRule.getRequiredOptions().stream()
+                .map(
+                    requiredOption -> {
+                        if (requiredOption instanceof RequiredOption.AbsolutelyRequiredOptions) {
+                            RequiredOption.AbsolutelyRequiredOptions
+                                absolutelyRequiredOptions =
+                                (RequiredOption.AbsolutelyRequiredOptions)
+                                    requiredOption;
+                            List<Option<?>> requiredOpList =
+                                absolutelyRequiredOptions.getOptions().stream()
+                                    .filter(
+                                        op -> {
+                                            return !dataSourceRequiredAllKey
+                                                .contains(op.key());
+                                        })
+                                    .collect(Collectors.toList());
+                            return requiredOpList.isEmpty() ? null
+                                : OptionRule.builder()
+                                .required(
+                                    requiredOpList.toArray(
+                                        new Option<?>[0]))
+                                .build()
+                                .getRequiredOptions()
+                                .get(0);
+                        }
+
+                        if (requiredOption instanceof RequiredOption.BundledRequiredOptions) {
+                            List<Option<?>> bundledRequiredOptions =
+                                requiredOption.getOptions();
+                            return bundledRequiredOptions.stream()
+                                .anyMatch(
+                                    op ->
+                                        dataSourceRequiredAllKey
+                                            .contains(op.key())) ? null
+                                : requiredOption;
+                        }
+
+                        if (requiredOption instanceof RequiredOption.ExclusiveRequiredOptions) {
+                            List<Option<?>> exclusiveOptions =
+                                requiredOption.getOptions();
+                            return exclusiveOptions.stream()
+                                .anyMatch(
+                                    op ->
+                                        dataSourceRequiredAllKey
+                                            .contains(op.key())) ? null
+                                : requiredOption;
+                        }
+
+                        if (requiredOption instanceof RequiredOption.ConditionalRequiredOptions) {
+                            List<Option<?>> conditionalRequiredOptions =
+                                requiredOption.getOptions();
+                            return conditionalRequiredOptions.stream()
+                                .anyMatch(
+                                    op ->
+                                        dataSourceRequiredAllKey
+                                            .contains(op.key())) ? null
+                                : requiredOption;
+                        }
+
+                        throw new UnSupportWrapperException(
+                            connectorName, "Unknown", requiredOption.toString());
+                    })
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+
+        List<String> dataSourceOptionAllKey =
+            Stream.concat(
+                    dataSourceOptionRule.getOptionalOptions().stream(),
+                    virtualTableOptionRule.getOptionalOptions().stream())
+                .map(Option::key)
+                .collect(Collectors.toList());
+
+        dataSourceOptionAllKey.addAll(excludedKeys);
+
+        List<Option<?>> optionList =
+            connectorOptionRule.getOptionalOptions().stream()
+                .filter(option -> !dataSourceOptionAllKey.contains(option.key()))
+                .collect(Collectors.toList());
+
+        return SeaTunnelOptionRuleWrapper.wrapper(
+            optionList, requiredOptions, connectorName, pluginType);
+    }
+
+    @Override
+    public Config mergeDatasourceConfig(
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig) {
+
+        Config mergedConfig = connectorConfig;
+
+        for (Map.Entry<String, ConfigValue> en : dataSourceInstanceConfig.entrySet()) {
+            mergedConfig = mergedConfig.withValue(en.getKey(), en.getValue());
+        }
+
+        return mergedConfig;
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceClientFactory.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceClientFactory.java
new file mode 100644
index 00000000..4647e6eb
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceClientFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource;
+
+import org.apache.seatunnel.datasource.DataSourceClient;
+
+public class DataSourceClientFactory {
+    private static DataSourceClient INSTANCE;
+
+    private static final Object LOCK = new Object();
+
+    public static DataSourceClient getDataSourceClient() {
+        if (null != INSTANCE) {
+            return INSTANCE;
+        }
+        synchronized (LOCK) {
+            if (null != INSTANCE) {
+                return INSTANCE;
+            }
+            INSTANCE = new DataSourceClient();
+            return INSTANCE;
+        }
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceConfigSwitcher.java
new file mode 100644
index 00000000..d7a4de8f
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceConfigSwitcher.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+public interface DataSourceConfigSwitcher {
+
+    /**
+     * Use the OptionRule of the data source to filter the OptionRule of the connector
+     */
+    FormStructure filterOptionRule(
+        String connectorName,
+        OptionRule dataSourceOptionRule,
+        OptionRule virtualTableOptionRule,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        OptionRule connectorOptionRule,
+        List<String> excludedKeys);
+
+    /**
+     * Merge the parameters of the data source instance and connector configuration into the final connector parameters
+     */
+    Config mergeDatasourceConfig(
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig);
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceConfigSwitcherUtils.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceConfigSwitcherUtils.java
new file mode 100644
index 00000000..2e5b3071
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceConfigSwitcherUtils.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.impl.ClickhouseDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.ElasticSearchDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.KafkaDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.MysqlCDCDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.MysqlDatasourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.OracleDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.PostgresCDCDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.PostgresqlDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.RedshiftDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.S3DataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.S3RedshiftDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.SqlServerCDCDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.SqlServerDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.StarRocksDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.TidbDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.ArrayList;
+
+public class DataSourceConfigSwitcherUtils {
+
+    public static FormStructure filterOptionRule(
+        String datasourceName,
+        String connectorName,
+        OptionRule dataSourceOptionRule,
+        OptionRule virtualTableOptionRule,
+        PluginType pluginType,
+        BusinessMode businessMode,
+        OptionRule connectorOptionRule) {
+        DataSourceConfigSwitcher dataSourceConfigSwitcher =
+            getDataSourceConfigSwitcher(datasourceName.toUpperCase());
+        return dataSourceConfigSwitcher.filterOptionRule(
+            connectorName,
+            dataSourceOptionRule,
+            virtualTableOptionRule,
+            businessMode,
+            pluginType,
+            connectorOptionRule,
+            new ArrayList<>());
+    }
+
+    public static Config mergeDatasourceConfig(
+        String datasourceName,
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig) {
+        DataSourceConfigSwitcher dataSourceConfigSwitcher =
+            getDataSourceConfigSwitcher(datasourceName.toUpperCase());
+        return dataSourceConfigSwitcher.mergeDatasourceConfig(
+            dataSourceInstanceConfig,
+            virtualTableDetail,
+            dataSourceOption,
+            selectTableFields,
+            businessMode,
+            pluginType,
+            connectorConfig);
+    }
+
+    private static DataSourceConfigSwitcher getDataSourceConfigSwitcher(String datasourceName) {
+        checkNotNull(datasourceName, "datasourceName cannot be null");
+        // Use SPI
+        switch (datasourceName.toUpperCase()) {
+            case "JDBC-MYSQL":
+                return MysqlDatasourceConfigSwitcher.INSTANCE;
+            case "ELASTICSEARCH":
+                return ElasticSearchDataSourceConfigSwitcher.INSTANCE;
+            case "KAFKA":
+                return KafkaDataSourceConfigSwitcher.getInstance();
+            case "STARROCKS":
+                return StarRocksDataSourceConfigSwitcher.INSTANCE;
+            case "MYSQL-CDC":
+                return MysqlCDCDataSourceConfigSwitcher.INSTANCE;
+            case "S3-REDSHIFT":
+                return S3RedshiftDataSourceConfigSwitcher.getInstance();
+            case "JDBC-CLICKHOUSE":
+                return ClickhouseDataSourceConfigSwitcher.getInstance();
+            case "JDBC-POSTGRES":
+                return PostgresqlDataSourceConfigSwitcher.getInstance();
+            case "JDBC-REDSHIFT":
+                return RedshiftDataSourceConfigSwitcher.getInstance();
+            case "JDBC-SQLSERVER":
+                return SqlServerDataSourceConfigSwitcher.getInstance();
+            case "JDBC-TIDB":
+                return TidbDataSourceConfigSwitcher.INSTANCE;
+            case "JDBC-ORACLE":
+                return OracleDataSourceConfigSwitcher.INSTANCE;
+            case "S3":
+                return S3DataSourceConfigSwitcher.getInstance();
+            case "SQLSERVER-CDC":
+                return SqlServerCDCDataSourceConfigSwitcher.INSTANCE;
+            case "POSTGRES-CDC":
+                return PostgresCDCDataSourceConfigSwitcher.INSTANCE;
+
+            default:
+                throw new SeaTunnelException(
+                    "data source : "
+                        + datasourceName
+                        + " is no implementation class for DataSourceConfigSwitcher");
+        }
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/SchemaGenerator.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/SchemaGenerator.java
new file mode 100644
index 00000000..730ee658
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/SchemaGenerator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableFieldRes;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class SchemaGenerator {
+
+    private SchemaGenerator() {
+    }
+
+    /**
+     * Generate the schema of the table.
+     *
+     * <pre>
+     * fields {
+     *        name = "string"
+     *        age = "int"
+     *       }
+     * </pre>
+     *
+     * @param virtualTableDetailRes virtual table detail.
+     * @param selectTableFields     select table fields which need to be placed in the schema.
+     * @return schema.
+     */
+    public static Config generateSchemaBySelectTableFields(
+        VirtualTableDetailRes virtualTableDetailRes, SelectTableFields selectTableFields) {
+        checkNotNull(selectTableFields, "selectTableFields cannot be null");
+        checkArgument(
+            CollectionUtils.isNotEmpty(selectTableFields.getTableFields()),
+            "selectTableFields.tableFields cannot be empty");
+
+        checkNotNull(virtualTableDetailRes, "virtualTableDetailRes cannot be null");
+        checkArgument(
+            CollectionUtils.isNotEmpty(virtualTableDetailRes.getFields()),
+            "virtualTableDetailRes.fields cannot be empty");
+
+        Map<String, VirtualTableFieldRes> fieldTypeMap =
+            virtualTableDetailRes.getFields().stream()
+                .collect(
+                    Collectors.toMap(
+                        VirtualTableFieldRes::getFieldName, Function.identity()));
+
+        Config schema = ConfigFactory.empty();
+        for (String fieldName : selectTableFields.getTableFields()) {
+            VirtualTableFieldRes virtualTableFieldRes =
+                checkNotNull(
+                    fieldTypeMap.get(fieldName),
+                    String.format(
+                        "Cannot find the field: %s from virtual table", fieldName));
+            schema =
+                schema.withValue(
+                    fieldName,
+                    ConfigValueFactory.fromAnyRef(virtualTableFieldRes.getFieldType()));
+        }
+        return schema.atKey("fields");
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/BaseJdbcDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/BaseJdbcDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..f8bba3dc
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/BaseJdbcDataSourceConfigSwitcher.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class BaseJdbcDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+    private static final String TABLE_KEY = "table";
+    private static final String DATABASE_KEY = "database";
+
+    private static final String QUERY_KEY = "query";
+
+    private static final String GENERATE_SINK_SQL = "generate_sink_sql";
+
+    private static final String URL_KEY = "url";
+
+    @Override
+    public FormStructure filterOptionRule(
+        String connectorName,
+        OptionRule dataSourceOptionRule,
+        OptionRule virtualTableOptionRule,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        OptionRule connectorOptionRule,
+        List<String> excludedKeys) {
+        Map<PluginType, List<String>> filterFieldMap = new HashMap<>();
+
+        filterFieldMap.put(
+            PluginType.SINK,
+            Arrays.asList(QUERY_KEY, TABLE_KEY, DATABASE_KEY, GENERATE_SINK_SQL));
+        filterFieldMap.put(PluginType.SOURCE, Collections.singletonList(QUERY_KEY));
+
+        return super.filterOptionRule(
+            connectorName,
+            dataSourceOptionRule,
+            virtualTableOptionRule,
+            businessMode,
+            pluginType,
+            connectorOptionRule,
+            filterFieldMap.get(pluginType));
+    }
+
+    @Override
+    public Config mergeDatasourceConfig(
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig) {
+
+        // replace database in url
+        if (dataSourceOption.getDatabases().size() == 1) {
+            String databaseName = dataSourceOption.getDatabases().get(0);
+            String url = dataSourceInstanceConfig.getString(URL_KEY);
+            String newUrl = replaceDatabaseNameInUrl(url, databaseName);
+            dataSourceInstanceConfig =
+                dataSourceInstanceConfig.withValue(
+                    URL_KEY, ConfigValueFactory.fromAnyRef(newUrl));
+        }
+        if (pluginType.equals(PluginType.SINK)) {
+            connectorConfig =
+                connectorConfig.withValue(
+                    GENERATE_SINK_SQL, ConfigValueFactory.fromAnyRef(false));
+        }
+        if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+
+            String databaseName = dataSourceOption.getDatabases().get(0);
+
+            String tableName = dataSourceOption.getTables().get(0);
+
+            // create sql from schema
+            if (pluginType.equals(PluginType.SOURCE)) {
+
+                List<String> tableFields = selectTableFields.getTableFields();
+
+                String sql = tableFieldsToSql(tableFields, databaseName, tableName);
+
+                connectorConfig =
+                    connectorConfig.withValue(QUERY_KEY, ConfigValueFactory.fromAnyRef(sql));
+            } else if (pluginType.equals(PluginType.SINK)) {
+                connectorConfig =
+                    connectorConfig.withValue(
+                        DATABASE_KEY, ConfigValueFactory.fromAnyRef(databaseName));
+                connectorConfig =
+                    connectorConfig.withValue(
+                        TABLE_KEY, ConfigValueFactory.fromAnyRef(tableName));
+            } else {
+                throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+            }
+
+            return super.mergeDatasourceConfig(
+                dataSourceInstanceConfig,
+                virtualTableDetail,
+                dataSourceOption,
+                selectTableFields,
+                businessMode,
+                pluginType,
+                connectorConfig);
+        } else if (businessMode.equals(BusinessMode.DATA_REPLICA)) {
+            String databaseName = dataSourceOption.getDatabases().get(0);
+            if (pluginType.equals(PluginType.SINK)) {
+                connectorConfig =
+                    connectorConfig.withValue(
+                        DATABASE_KEY, ConfigValueFactory.fromAnyRef(databaseName));
+                return super.mergeDatasourceConfig(
+                    dataSourceInstanceConfig,
+                    virtualTableDetail,
+                    dataSourceOption,
+                    selectTableFields,
+                    businessMode,
+                    pluginType,
+                    connectorConfig);
+            } else {
+                throw new UnsupportedOperationException(
+                    "JDBC DATA_REPLICA Unsupported plugin type: " + pluginType);
+            }
+
+        } else {
+            throw new UnsupportedOperationException("Unsupported businessMode : " + businessMode);
+        }
+    }
+
+    protected String generateSql(
+        List<String> tableFields, String database, String schema, String table) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("SELECT ");
+        for (int i = 0; i < tableFields.size(); i++) {
+            sb.append(quoteIdentifier(tableFields.get(i)));
+            if (i < tableFields.size() - 1) {
+                sb.append(", ");
+            }
+        }
+        sb.append(" FROM ").append(quoteIdentifier(database));
+        if (schema != null && !schema.isEmpty()) {
+            sb.append(".").append(quoteIdentifier(schema));
+        }
+        sb.append(".").append(quoteIdentifier(table));
+
+        return sb.toString();
+    }
+
+    protected String tableFieldsToSql(List<String> tableFields, String database, String table) {
+        return generateSql(tableFields, database, null, table);
+    }
+
+    protected String quoteIdentifier(String identifier) {
+        return "`" + identifier + "`";
+    }
+
+    protected String replaceDatabaseNameInUrl(String url, String databaseName) {
+        return url;
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/ClickhouseDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/ClickhouseDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..72fb36b7
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/ClickhouseDataSourceConfigSwitcher.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.utils.JdbcUtils;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Map;
+
+public class ClickhouseDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+    private static final ClickhouseDataSourceConfigSwitcher INSTANCE =
+        new ClickhouseDataSourceConfigSwitcher();
+
+    private static final String HOST = "host";
+    private static final String URL = "url";
+    private static final String SQL = "sql";
+    private static final String DATABASE = "database";
+    private static final String TABLE = "table";
+
+    private static final Map<PluginType, List<String>> FILTER_FIELD_MAP =
+        new ImmutableMap.Builder<PluginType, List<String>>()
+            .put(PluginType.SOURCE, Lists.newArrayList(SQL, HOST))
+            .put(PluginType.SINK, Lists.newArrayList(HOST, DATABASE, TABLE))
+            .build();
+
+    @Override
+    public FormStructure filterOptionRule(
+        String connectorName,
+        OptionRule dataSourceOptionRule,
+        OptionRule virtualTableOptionRule,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        OptionRule connectorOptionRule,
+        List<String> excludedKeys) {
+        return super.filterOptionRule(
+            connectorName,
+            dataSourceOptionRule,
+            virtualTableOptionRule,
+            businessMode,
+            pluginType,
+            connectorOptionRule,
+            FILTER_FIELD_MAP.get(pluginType));
+    }
+
+    @Override
+    public Config mergeDatasourceConfig(
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig) {
+        switch (businessMode) {
+            case DATA_REPLICA:
+                // We only support sink in data replica mode
+                if (pluginType.equals(PluginType.SINK)) {
+                    connectorConfig =
+                        connectorConfig.withValue(
+                            DATABASE,
+                            ConfigValueFactory.fromAnyRef(
+                                dataSourceOption.getDatabases().get(0)));
+                } else {
+                    throw new UnsupportedOperationException(
+                        "Clickhouse DATA_REPLICA Unsupported plugin type: " + pluginType);
+                }
+                break;
+            case DATA_INTEGRATION:
+                // generate the sql by the schema
+                if (pluginType.equals(PluginType.SOURCE)) {
+                    List<String> tableFields = selectTableFields.getTableFields();
+                    String sql =
+                        String.format(
+                            "SELECT %s FROM %s",
+                            String.join(",", tableFields),
+                            String.format(
+                                "`%s`.`%s`",
+                                dataSourceOption.getDatabases().get(0),
+                                dataSourceOption.getTables().get(0)));
+                    connectorConfig =
+                        connectorConfig.withValue(SQL, ConfigValueFactory.fromAnyRef(sql));
+                } else if (pluginType.equals(PluginType.SINK)) {
+                    connectorConfig =
+                        connectorConfig.withValue(
+                            DATABASE,
+                            ConfigValueFactory.fromAnyRef(
+                                dataSourceOption.getDatabases().get(0)));
+                    connectorConfig =
+                        connectorConfig.withValue(
+                            TABLE,
+                            ConfigValueFactory.fromAnyRef(
+                                dataSourceOption.getTables().get(0)));
+                } else {
+                    throw new UnsupportedOperationException(
+                        "Unsupported plugin type: " + pluginType);
+                }
+                break;
+            default:
+                break;
+        }
+        connectorConfig =
+            connectorConfig.withValue(
+                HOST,
+                ConfigValueFactory.fromAnyRef(
+                    JdbcUtils.getAddressFromUrl(
+                        dataSourceInstanceConfig.getString(URL))));
+        return super.mergeDatasourceConfig(
+            dataSourceInstanceConfig,
+            virtualTableDetail,
+            dataSourceOption,
+            selectTableFields,
+            businessMode,
+            pluginType,
+            connectorConfig);
+    }
+
+    public static ClickhouseDataSourceConfigSwitcher getInstance() {
+        return INSTANCE;
+    }
+
+    private ClickhouseDataSourceConfigSwitcher() {
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/ElasticSearchDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/ElasticSearchDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..a378fae7
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/ElasticSearchDataSourceConfigSwitcher.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ElasticSearchDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+    public static final ElasticSearchDataSourceConfigSwitcher INSTANCE =
+        new ElasticSearchDataSourceConfigSwitcher();
+
+    private static final String SOURCE = "source";
+    private static final String SCHEMA = "schema";
+    private static final String PRIMARY_KEYS = "primary_keys";
+    private static final String INDEX = "index";
+
+    private ElasticSearchDataSourceConfigSwitcher() {
+    }
+
+    @Override
+    public FormStructure filterOptionRule(
+        String connectorName,
+        OptionRule dataSourceOptionRule,
+        OptionRule virtualTableOptionRule,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        OptionRule connectorOptionRule,
+        List<String> excludedKeys) {
+        if (PluginType.SOURCE.equals(pluginType)) {
+            // DELETE source/schema
+            excludedKeys.addAll(Arrays.asList(SOURCE, SCHEMA, INDEX));
+        } else if (PluginType.SINK.equals(pluginType)) {
+            excludedKeys.add(INDEX);
+            // DELETE primary_keys
+            if (businessMode.equals(BusinessMode.DATA_REPLICA)) {
+                excludedKeys.add(PRIMARY_KEYS);
+            }
+        } else {
+            throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+        }
+        return super.filterOptionRule(
+            connectorName,
+            dataSourceOptionRule,
+            virtualTableOptionRule,
+            businessMode,
+            pluginType,
+            connectorOptionRule,
+            excludedKeys);
+    }
+
+    @Override
+    public Config mergeDatasourceConfig(
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig) {
+        if (PluginType.SOURCE.equals(pluginType)) {
+            if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+                // Add source
+                connectorConfig =
+                    connectorConfig.withValue(
+                        INDEX,
+                        ConfigValueFactory.fromAnyRef(dataSourceOption.getTables().get(0)));
+                connectorConfig =
+                    connectorConfig.withValue(
+                        SOURCE,
+                        ConfigValueFactory.fromIterable(
+                            selectTableFields.getTableFields()));
+            } else {
+                throw new UnsupportedOperationException(
+                    "Unsupported business mode: " + businessMode);
+            }
+        } else if (PluginType.SINK.equals(pluginType)) {
+            // TODO Add primary_keys
+            if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+                // Add Index
+                connectorConfig =
+                    connectorConfig.withValue(
+                        INDEX,
+                        ConfigValueFactory.fromAnyRef(dataSourceOption.getTables().get(0)));
+            }
+        } else {
+            throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+        }
+        return super.mergeDatasourceConfig(
+            dataSourceInstanceConfig,
+            virtualTableDetail,
+            dataSourceOption,
+            selectTableFields,
+            businessMode,
+            pluginType,
+            connectorConfig);
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/KafkaDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/KafkaDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..3e7b5fdd
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/KafkaDataSourceConfigSwitcher.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.SchemaGenerator;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.List;
+
+public class KafkaDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+    private static final KafkaDataSourceConfigSwitcher INSTANCE =
+        new KafkaDataSourceConfigSwitcher();
+
+    private static final String SCHEMA = "schema";
+    private static final String TOPIC = "topic";
+    private static final String TABLE = "table";
+    private static final String FORMAT = "format";
+
+    private static final String DEBEZIUM_FORMAT = "COMPATIBLE_DEBEZIUM_JSON";
+
+    @Override
+    public FormStructure filterOptionRule(
+        String connectorName,
+        OptionRule dataSourceOptionRule,
+        OptionRule virtualTableOptionRule,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        OptionRule connectorOptionRule,
+        List<String> excludedKeys) {
+        if (pluginType == PluginType.SOURCE) {
+            excludedKeys.add(SCHEMA);
+            excludedKeys.add(TOPIC);
+        }
+        if (pluginType == PluginType.SINK && businessMode.equals(BusinessMode.DATA_REPLICA)) {
+            excludedKeys.add(FORMAT);
+        }
+        return super.filterOptionRule(
+            connectorName,
+            dataSourceOptionRule,
+            virtualTableOptionRule,
+            businessMode,
+            pluginType,
+            connectorOptionRule,
+            excludedKeys);
+    }
+
+    @Override
+    public Config mergeDatasourceConfig(
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig) {
+        if (pluginType == PluginType.SOURCE) {
+            // Use field to generate the schema
+            connectorConfig =
+                connectorConfig.withValue(
+                    TOPIC,
+                    ConfigValueFactory.fromAnyRef(
+                        virtualTableDetail.getDatasourceProperties().get(TOPIC)));
+            connectorConfig =
+                connectorConfig.withValue(
+                    SCHEMA,
+                    SchemaGenerator.generateSchemaBySelectTableFields(
+                            virtualTableDetail, selectTableFields)
+                        .root());
+        } else if (pluginType == PluginType.SINK) {
+            if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+                // Set the table name to topic
+                connectorConfig =
+                    connectorConfig.withValue(
+                        TOPIC,
+                        ConfigValueFactory.fromAnyRef(
+                            virtualTableDetail.getDatasourceProperties().get(TOPIC)));
+            }
+            if (businessMode.equals(BusinessMode.DATA_REPLICA)) {
+                connectorConfig =
+                    connectorConfig.withValue(
+                        FORMAT, ConfigValueFactory.fromAnyRef(DEBEZIUM_FORMAT));
+            }
+        } else {
+            throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+        }
+
+        return super.mergeDatasourceConfig(
+            dataSourceInstanceConfig,
+            virtualTableDetail,
+            dataSourceOption,
+            selectTableFields,
+            businessMode,
+            pluginType,
+            connectorConfig);
+    }
+
+    private KafkaDataSourceConfigSwitcher() {
+    }
+
+    public static KafkaDataSourceConfigSwitcher getInstance() {
+        return INSTANCE;
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/MysqlCDCDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/MysqlCDCDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..2d99e9bd
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/MysqlCDCDataSourceConfigSwitcher.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableFieldRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+public class MysqlCDCDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+    private MysqlCDCDataSourceConfigSwitcher() {
+    }
+
+    public static final MysqlCDCDataSourceConfigSwitcher INSTANCE =
+        new MysqlCDCDataSourceConfigSwitcher();
+
+    private static final String FACTORY = "factory";
+
+    private static final String CATALOG = "catalog";
+
+    private static final String TABLE_NAMES = "table-names";
+
+    private static final String DATABASE_NAMES = "database-names";
+
+    private static final String FORMAT_KEY = "format";
+
+    private static final String DEBEZIUM_FORMAT = "COMPATIBLE_DEBEZIUM_JSON";
+
+    private static final String DEFAULT_FORMAT = "DEFAULT";
+
+    private static final String SCHEMA = "schema";
+
+    @Override
+    public FormStructure filterOptionRule(
+        String connectorName,
+        OptionRule dataSourceOptionRule,
+        OptionRule virtualTableOptionRule,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        OptionRule connectorOptionRule,
+        List<String> excludedKeys) {
+        if (PluginType.SOURCE.equals(pluginType)) {
+            excludedKeys.add(DATABASE_NAMES);
+            excludedKeys.add(TABLE_NAMES);
+            if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+                excludedKeys.add(FORMAT_KEY);
+            }
+        } else {
+            throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+        }
+        return super.filterOptionRule(
+            connectorName,
+            dataSourceOptionRule,
+            virtualTableOptionRule,
+            businessMode,
+            pluginType,
+            connectorOptionRule,
+            excludedKeys);
+    }
+
+    @Override
+    public Config mergeDatasourceConfig(
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig) {
+        if (PluginType.SOURCE.equals(pluginType)) {
+            // Add table-names
+            Config config = ConfigFactory.empty();
+            config = config.withValue(FACTORY, ConfigValueFactory.fromAnyRef("Mysql"));
+            connectorConfig = connectorConfig.withValue(CATALOG, config.root());
+            connectorConfig =
+                connectorConfig.withValue(
+                    DATABASE_NAMES,
+                    ConfigValueFactory.fromIterable(dataSourceOption.getDatabases()));
+            connectorConfig =
+                connectorConfig.withValue(
+                    TABLE_NAMES,
+                    ConfigValueFactory.fromIterable(
+                        mergeDatabaseAndTables(dataSourceOption)));
+
+            if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+                connectorConfig =
+                    connectorConfig.withValue(
+                        FORMAT_KEY, ConfigValueFactory.fromAnyRef(DEFAULT_FORMAT));
+            } else if (businessMode.equals(BusinessMode.DATA_REPLICA)
+                && connectorConfig
+                .getString(FORMAT_KEY)
+                .toUpperCase(Locale.ROOT)
+                .equals(DEBEZIUM_FORMAT)) {
+                connectorConfig =
+                    connectorConfig.withValue(SCHEMA, generateDebeziumFormatSchema().root());
+            }
+        } else {
+            throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+        }
+        return super.mergeDatasourceConfig(
+            dataSourceInstanceConfig,
+            virtualTableDetail,
+            dataSourceOption,
+            selectTableFields,
+            businessMode,
+            pluginType,
+            connectorConfig);
+    }
+
+    private Config generateDebeziumFormatSchema() {
+        List<VirtualTableFieldRes> fieldResList = new ArrayList<>();
+        fieldResList.add(new VirtualTableFieldRes("topic", "string", false, null, false, "", ""));
+        fieldResList.add(new VirtualTableFieldRes("key", "string", false, null, false, "", ""));
+        fieldResList.add(new VirtualTableFieldRes("value", "string", false, null, false, "", ""));
+
+        Config schema = ConfigFactory.empty();
+        for (VirtualTableFieldRes virtualTableFieldRes : fieldResList) {
+            schema =
+                schema.withValue(
+                    virtualTableFieldRes.getFieldName(),
+                    ConfigValueFactory.fromAnyRef(virtualTableFieldRes.getFieldType()));
+        }
+        return schema.atKey("fields");
+    }
+
+    private List<String> mergeDatabaseAndTables(DataSourceOption dataSourceOption) {
+        List<String> tables = new ArrayList<>();
+        dataSourceOption
+            .getDatabases()
+            .forEach(
+                database -> {
+                    dataSourceOption
+                        .getTables()
+                        .forEach(
+                            table -> {
+                                if (table.contains(".")) {
+                                    tables.add(table);
+                                } else {
+                                    tables.add(
+                                        getDatabaseAndTable(database, table));
+                                }
+                            });
+                });
+        return tables;
+    }
+
+    private String getDatabaseAndTable(String database, String table) {
+        return String.format("%s.%s", database, table);
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/MysqlDatasourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/MysqlDatasourceConfigSwitcher.java
new file mode 100644
index 00000000..c27d323d
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/MysqlDatasourceConfigSwitcher.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+public class MysqlDatasourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher {
+    public static MysqlDatasourceConfigSwitcher INSTANCE = new MysqlDatasourceConfigSwitcher();
+
+    private MysqlDatasourceConfigSwitcher() {
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/OracleDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/OracleDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..43d85c84
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/OracleDataSourceConfigSwitcher.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class OracleDataSourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher {
+    public static final OracleDataSourceConfigSwitcher INSTANCE =
+            new OracleDataSourceConfigSwitcher();
+
+    protected String tableFieldsToSql(List<String> tableFields, String database, String fullTable) {
+        String[] split = fullTable.split("\\.");
+        if (split.length != 2) {
+            throw new SeaTunnelException(
+                    "The tableName for oracle must be schemaName.tableName, but tableName is "
+                            + fullTable);
+        }
+
+        String schemaName = split[0];
+        String tableName = split[1];
+
+        return generateSql(tableFields, database, schemaName, tableName);
+    }
+
+    protected String quoteIdentifier(String identifier) {
+        return "\"" + identifier + "\"";
+    }
+
+    @Override
+    protected String generateSql(
+            List<String> tableFields, String database, String schema, String table) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("SELECT ");
+        for (int i = 0; i < tableFields.size(); i++) {
+            sb.append(quoteIdentifier(tableFields.get(i)));
+            if (i < tableFields.size() - 1) {
+                sb.append(", ");
+            }
+        }
+        sb.append(" FROM ")
+                .append(quoteIdentifier(schema))
+                .append(".")
+                .append(quoteIdentifier(table));
+        return sb.toString();
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/PostgresCDCDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/PostgresCDCDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..1b282dc0
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/PostgresCDCDataSourceConfigSwitcher.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableFieldRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+public class PostgresCDCDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+    private PostgresCDCDataSourceConfigSwitcher() {
+    }
+
+    public static final PostgresCDCDataSourceConfigSwitcher INSTANCE =
+        new PostgresCDCDataSourceConfigSwitcher();
+
+    private static final String FACTORY = "factory";
+
+    private static final String CATALOG = "catalog";
+
+    private static final String TABLE_NAMES = "table-names";
+
+    private static final String DATABASE_NAMES = "database-names";
+
+    private static final String FORMAT_KEY = "format";
+
+    private static final String DEBEZIUM_FORMAT = "COMPATIBLE_DEBEZIUM_JSON";
+
+    private static final String DEFAULT_FORMAT = "DEFAULT";
+
+    private static final String SCHEMA = "schema";
+
+    @Override
+    public FormStructure filterOptionRule(
+        String connectorName,
+        OptionRule dataSourceOptionRule,
+        OptionRule virtualTableOptionRule,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        OptionRule connectorOptionRule,
+        List<String> excludedKeys) {
+        if (PluginType.SOURCE.equals(pluginType)) {
+            excludedKeys.add(DATABASE_NAMES);
+            excludedKeys.add(TABLE_NAMES);
+            if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+                excludedKeys.add(FORMAT_KEY);
+            }
+        } else {
+            throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+        }
+        return super.filterOptionRule(
+            connectorName,
+            dataSourceOptionRule,
+            virtualTableOptionRule,
+            businessMode,
+            pluginType,
+            connectorOptionRule,
+            excludedKeys);
+    }
+
+    @Override
+    public Config mergeDatasourceConfig(
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig) {
+        if (PluginType.SOURCE.equals(pluginType)) {
+            // Add table-names
+            Config config = ConfigFactory.empty();
+            config = config.withValue(FACTORY, ConfigValueFactory.fromAnyRef("Postgres"));
+            connectorConfig = connectorConfig.withValue(CATALOG, config.root());
+            connectorConfig =
+                connectorConfig.withValue(
+                    DATABASE_NAMES,
+                    ConfigValueFactory.fromIterable(dataSourceOption.getDatabases()));
+            connectorConfig =
+                connectorConfig.withValue(
+                    TABLE_NAMES,
+                    ConfigValueFactory.fromIterable(
+                        mergeDatabaseAndTables(dataSourceOption)));
+
+            if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+                connectorConfig =
+                    connectorConfig.withValue(
+                        FORMAT_KEY, ConfigValueFactory.fromAnyRef(DEFAULT_FORMAT));
+            } else if (businessMode.equals(BusinessMode.DATA_REPLICA)
+                && connectorConfig
+                .getString(FORMAT_KEY)
+                .toUpperCase(Locale.ROOT)
+                .equals(DEBEZIUM_FORMAT)) {
+                connectorConfig =
+                    connectorConfig.withValue(SCHEMA, generateDebeziumFormatSchema().root());
+            }
+        } else {
+            throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+        }
+        return super.mergeDatasourceConfig(
+            dataSourceInstanceConfig,
+            virtualTableDetail,
+            dataSourceOption,
+            selectTableFields,
+            businessMode,
+            pluginType,
+            connectorConfig);
+    }
+
+    private Config generateDebeziumFormatSchema() {
+        List<VirtualTableFieldRes> fieldResList = new ArrayList<>();
+        fieldResList.add(new VirtualTableFieldRes("topic", "string", false, null, false, "", ""));
+        fieldResList.add(new VirtualTableFieldRes("key", "string", false, null, false, "", ""));
+        fieldResList.add(new VirtualTableFieldRes("value", "string", false, null, false, "", ""));
+
+        Config schema = ConfigFactory.empty();
+        for (VirtualTableFieldRes virtualTableFieldRes : fieldResList) {
+            schema =
+                schema.withValue(
+                    virtualTableFieldRes.getFieldName(),
+                    ConfigValueFactory.fromAnyRef(virtualTableFieldRes.getFieldType()));
+        }
+        return schema.atKey("fields");
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private List<String> mergeDatabaseAndTables(DataSourceOption dataSourceOption) {
+        List<String> tables = new ArrayList<>();
+        dataSourceOption
+            .getDatabases()
+            .forEach(
+                database -> {
+                    dataSourceOption
+                        .getTables()
+                        .forEach(
+                            table -> {
+                                final String[] tableFragments = table.split("\\.");
+                                if (tableFragments.length == 3) {
+                                    tables.add(table);
+                                } else if (tableFragments.length == 2) {
+                                    tables.add(
+                                        getDatabaseAndTable(database, table));
+                                } else {
+                                    throw new IllegalArgumentException(
+                                        "Illegal postgres table-name: "
+                                            + table);
+                                }
+                            });
+                });
+        return tables;
+    }
+
+    private String getDatabaseAndTable(String database, String table) {
+        return String.format("%s.%s", database, table);
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/PostgresqlDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/PostgresqlDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..90a983d9
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/PostgresqlDataSourceConfigSwitcher.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.app.thridparty.datasource.DataSourceConfigSwitcher;
+import org.apache.seatunnel.app.utils.JdbcUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import java.util.List;
+
+public class PostgresqlDataSourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher {
+
+    private static final PostgresqlDataSourceConfigSwitcher INSTANCE =
+        new PostgresqlDataSourceConfigSwitcher();
+
+    private PostgresqlDataSourceConfigSwitcher() {
+    }
+
+    protected String tableFieldsToSql(List<String> tableFields, String database, String fullTable) {
+
+        String[] split = fullTable.split("\\.");
+        if (split.length != 2) {
+            throw new SeaTunnelException(
+                "The tableName for postgres must be schemaName.tableName, but tableName is "
+                    + fullTable);
+        }
+
+        String schemaName = split[0];
+        String tableName = split[1];
+
+        return generateSql(tableFields, database, schemaName, tableName);
+    }
+
+    protected String quoteIdentifier(String identifier) {
+        return "\"" + identifier + "\"";
+    }
+
+    protected String replaceDatabaseNameInUrl(String url, String databaseName) {
+        return JdbcUtils.replaceDatabase(url, databaseName);
+    }
+
+    public static DataSourceConfigSwitcher getInstance() {
+        return INSTANCE;
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/RedshiftDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/RedshiftDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..98da884c
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/RedshiftDataSourceConfigSwitcher.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.app.utils.JdbcUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import java.util.List;
+
+public class RedshiftDataSourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher {
+    private static final RedshiftDataSourceConfigSwitcher INSTANCE =
+        new RedshiftDataSourceConfigSwitcher();
+
+    public static final RedshiftDataSourceConfigSwitcher getInstance() {
+        return INSTANCE;
+    }
+
+    protected String tableFieldsToSql(List<String> tableFields, String database, String fullTable) {
+
+        String[] split = fullTable.split("\\.");
+        if (split.length != 2) {
+            throw new SeaTunnelException(
+                "The tableName for postgres must be schemaName.tableName, but tableName is "
+                    + fullTable);
+        }
+
+        String schemaName = split[0];
+        String tableName = split[1];
+
+        return generateSql(tableFields, database, schemaName, tableName);
+    }
+
+    protected String quoteIdentifier(String identifier) {
+        return "\"" + identifier + "\"";
+    }
+
+    protected String replaceDatabaseNameInUrl(String url, String databaseName) {
+        return JdbcUtils.replaceDatabase(url, databaseName);
+    }
+
+    private RedshiftDataSourceConfigSwitcher() {
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/S3DataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/S3DataSourceConfigSwitcher.java
new file mode 100644
index 00000000..d0c30cf0
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/S3DataSourceConfigSwitcher.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.SchemaGenerator;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.datasource.plugin.s3.S3OptionRule;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+@Slf4j
+public class S3DataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+    private S3DataSourceConfigSwitcher() {
+    }
+
+    private static final S3DataSourceConfigSwitcher INSTANCE = new S3DataSourceConfigSwitcher();
+
+    @Override
+    public FormStructure filterOptionRule(
+        String connectorName,
+        OptionRule dataSourceOptionRule,
+        OptionRule virtualTableOptionRule,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        OptionRule connectorOptionRule,
+        List<String> excludedKeys) {
+        excludedKeys.add(S3OptionRule.PATH.key());
+        if (PluginType.SOURCE.equals(pluginType)) {
+            excludedKeys.add(S3OptionRule.SCHEMA.key());
+        }
+
+        return super.filterOptionRule(
+            connectorName,
+            dataSourceOptionRule,
+            virtualTableOptionRule,
+            businessMode,
+            pluginType,
+            connectorOptionRule,
+            excludedKeys);
+    }
+
+    @Override
+    public Config mergeDatasourceConfig(
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig) {
+        if (PluginType.SOURCE.equals(pluginType)) {
+            connectorConfig =
+                connectorConfig
+                    .withValue(
+                        S3OptionRule.SCHEMA.key(),
+                        SchemaGenerator.generateSchemaBySelectTableFields(
+                                virtualTableDetail, selectTableFields)
+                            .root())
+                    .withValue(
+                        S3OptionRule.PATH.key(),
+                        ConfigValueFactory.fromAnyRef(
+                            virtualTableDetail
+                                .getDatasourceProperties()
+                                .get(S3OptionRule.PATH.key())))
+                    .withValue(
+                        S3OptionRule.TYPE.key(),
+                        ConfigValueFactory.fromAnyRef(
+                            virtualTableDetail
+                                .getDatasourceProperties()
+                                .get(S3OptionRule.TYPE.key())))
+                    .withValue(
+                        S3OptionRule.PARSE_PARSE_PARTITION_FROM_PATH.key(),
+                        ConfigValueFactory.fromAnyRef(
+                            virtualTableDetail
+                                .getDatasourceProperties()
+                                .get(
+                                    S3OptionRule
+                                        .PARSE_PARSE_PARTITION_FROM_PATH
+                                        .key())))
+                    .withValue(
+                        S3OptionRule.DATE_FORMAT.key(),
+                        ConfigValueFactory.fromAnyRef(
+                            virtualTableDetail
+                                .getDatasourceProperties()
+                                .get(S3OptionRule.DATE_FORMAT.key())))
+                    .withValue(
+                        S3OptionRule.DATETIME_FORMAT.key(),
+                        ConfigValueFactory.fromAnyRef(
+                            virtualTableDetail
+                                .getDatasourceProperties()
+                                .get(S3OptionRule.DATETIME_FORMAT.key())))
+                    .withValue(
+                        S3OptionRule.TIME_FORMAT.key(),
+                        ConfigValueFactory.fromAnyRef(
+                            virtualTableDetail
+                                .getDatasourceProperties()
+                                .get(S3OptionRule.TIME_FORMAT.key())));
+        } else if (PluginType.SINK.equals(pluginType)) {
+            if (virtualTableDetail.getDatasourceProperties().get(S3OptionRule.TIME_FORMAT.key()) == null) {
+                throw new IllegalArgumentException("S3 virtual table path is null");
+            }
+            connectorConfig =
+                connectorConfig
+                    .withValue(
+                        S3OptionRule.PATH.key(),
+                        ConfigValueFactory.fromAnyRef(
+                            virtualTableDetail
+                                .getDatasourceProperties()
+                                .get(S3OptionRule.PATH.key())))
+                    .withValue(
+                        S3OptionRule.TYPE.key(),
+                        ConfigValueFactory.fromAnyRef(
+                            virtualTableDetail
+                                .getDatasourceProperties()
+                                .get(S3OptionRule.TYPE.key())))
+                    .withValue(
+                        S3OptionRule.PARSE_PARSE_PARTITION_FROM_PATH.key(),
+                        ConfigValueFactory.fromAnyRef(
+                            virtualTableDetail
+                                .getDatasourceProperties()
+                                .get(
+                                    S3OptionRule
+                                        .PARSE_PARSE_PARTITION_FROM_PATH
+                                        .key())))
+                    .withValue(
+                        S3OptionRule.DATE_FORMAT.key(),
+                        ConfigValueFactory.fromAnyRef(
+                            virtualTableDetail
+                                .getDatasourceProperties()
+                                .get(S3OptionRule.DATE_FORMAT.key())))
+                    .withValue(
+                        S3OptionRule.DATETIME_FORMAT.key(),
+                        ConfigValueFactory.fromAnyRef(
+                            virtualTableDetail
+                                .getDatasourceProperties()
+                                .get(S3OptionRule.DATETIME_FORMAT.key())))
+                    .withValue(
+                        S3OptionRule.TIME_FORMAT.key(),
+                        ConfigValueFactory.fromAnyRef(
+                            virtualTableDetail
+                                .getDatasourceProperties()
+                                .get(S3OptionRule.TIME_FORMAT.key())));
+        }
+        return super.mergeDatasourceConfig(
+            dataSourceInstanceConfig,
+            virtualTableDetail,
+            dataSourceOption,
+            selectTableFields,
+            businessMode,
+            pluginType,
+            connectorConfig);
+    }
+
+    public static S3DataSourceConfigSwitcher getInstance() {
+        return INSTANCE;
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/S3RedshiftDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/S3RedshiftDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..dd7b208a
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/S3RedshiftDataSourceConfigSwitcher.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+public class S3RedshiftDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+    private S3RedshiftDataSourceConfigSwitcher() {
+    }
+
+    private static final S3RedshiftDataSourceConfigSwitcher INSTANCE =
+        new S3RedshiftDataSourceConfigSwitcher();
+
+    @Override
+    public FormStructure filterOptionRule(
+        String connectorName,
+        OptionRule dataSourceOptionRule,
+        OptionRule virtualTableOptionRule,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        OptionRule connectorOptionRule,
+        List<String> excludedKeys) {
+        excludedKeys.add("access_key");
+        excludedKeys.add("secret_key");
+        return super.filterOptionRule(
+            connectorName,
+            dataSourceOptionRule,
+            virtualTableOptionRule,
+            businessMode,
+            pluginType,
+            connectorOptionRule,
+            excludedKeys);
+    }
+
+    @Override
+    public Config mergeDatasourceConfig(
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig) {
+        return super.mergeDatasourceConfig(
+            dataSourceInstanceConfig,
+            virtualTableDetail,
+            dataSourceOption,
+            selectTableFields,
+            businessMode,
+            pluginType,
+            connectorConfig);
+    }
+
+    public static S3RedshiftDataSourceConfigSwitcher getInstance() {
+        return INSTANCE;
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/SqlServerCDCDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/SqlServerCDCDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..807d6e4c
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/SqlServerCDCDataSourceConfigSwitcher.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableFieldRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+public class SqlServerCDCDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+    private SqlServerCDCDataSourceConfigSwitcher() {
+    }
+
+    public static final SqlServerCDCDataSourceConfigSwitcher INSTANCE =
+        new SqlServerCDCDataSourceConfigSwitcher();
+
+    private static final String FACTORY = "factory";
+
+    private static final String CATALOG = "catalog";
+
+    private static final String TABLE_NAMES = "table-names";
+
+    private static final String DATABASE_NAMES = "database-names";
+
+    private static final String FORMAT_KEY = "format";
+
+    private static final String DEBEZIUM_FORMAT = "COMPATIBLE_DEBEZIUM_JSON";
+
+    private static final String DEFAULT_FORMAT = "DEFAULT";
+
+    private static final String SCHEMA = "schema";
+
+    @Override
+    public FormStructure filterOptionRule(
+        String connectorName,
+        OptionRule dataSourceOptionRule,
+        OptionRule virtualTableOptionRule,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        OptionRule connectorOptionRule,
+        List<String> excludedKeys) {
+        if (PluginType.SOURCE.equals(pluginType)) {
+            excludedKeys.add(DATABASE_NAMES);
+            excludedKeys.add(TABLE_NAMES);
+            if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+                excludedKeys.add(FORMAT_KEY);
+            }
+        } else {
+            throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+        }
+        return super.filterOptionRule(
+            connectorName,
+            dataSourceOptionRule,
+            virtualTableOptionRule,
+            businessMode,
+            pluginType,
+            connectorOptionRule,
+            excludedKeys);
+    }
+
+    @Override
+    public Config mergeDatasourceConfig(
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig) {
+        if (PluginType.SOURCE.equals(pluginType)) {
+            // Add table-names
+            Config config = ConfigFactory.empty();
+            config = config.withValue(FACTORY, ConfigValueFactory.fromAnyRef("SqlServer"));
+            connectorConfig = connectorConfig.withValue(CATALOG, config.root());
+            connectorConfig =
+                connectorConfig.withValue(
+                    DATABASE_NAMES,
+                    ConfigValueFactory.fromIterable(dataSourceOption.getDatabases()));
+            connectorConfig =
+                connectorConfig.withValue(
+                    TABLE_NAMES,
+                    ConfigValueFactory.fromIterable(
+                        mergeDatabaseAndTables(dataSourceOption)));
+
+            if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+                connectorConfig =
+                    connectorConfig.withValue(
+                        FORMAT_KEY, ConfigValueFactory.fromAnyRef(DEFAULT_FORMAT));
+            } else if (businessMode.equals(BusinessMode.DATA_REPLICA)
+                && connectorConfig
+                .getString(FORMAT_KEY)
+                .toUpperCase(Locale.ROOT)
+                .equals(DEBEZIUM_FORMAT)) {
+                connectorConfig =
+                    connectorConfig.withValue(SCHEMA, generateDebeziumFormatSchema().root());
+            }
+        } else {
+            throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+        }
+        return super.mergeDatasourceConfig(
+            dataSourceInstanceConfig,
+            virtualTableDetail,
+            dataSourceOption,
+            selectTableFields,
+            businessMode,
+            pluginType,
+            connectorConfig);
+    }
+
+    private Config generateDebeziumFormatSchema() {
+        List<VirtualTableFieldRes> fieldResList = new ArrayList<>();
+        fieldResList.add(new VirtualTableFieldRes("topic", "string", false, null, false, "", ""));
+        fieldResList.add(new VirtualTableFieldRes("key", "string", false, null, false, "", ""));
+        fieldResList.add(new VirtualTableFieldRes("value", "string", false, null, false, "", ""));
+
+        Config schema = ConfigFactory.empty();
+        for (VirtualTableFieldRes virtualTableFieldRes : fieldResList) {
+            schema =
+                schema.withValue(
+                    virtualTableFieldRes.getFieldName(),
+                    ConfigValueFactory.fromAnyRef(virtualTableFieldRes.getFieldType()));
+        }
+        return schema.atKey("fields");
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private List<String> mergeDatabaseAndTables(DataSourceOption dataSourceOption) {
+        List<String> tables = new ArrayList<>();
+        dataSourceOption
+            .getDatabases()
+            .forEach(
+                database -> {
+                    dataSourceOption
+                        .getTables()
+                        .forEach(
+                            table -> {
+                                final String[] tableFragments = table.split("\\.");
+                                if (tableFragments.length == 3) {
+                                    tables.add(table);
+                                } else if (tableFragments.length == 2) {
+                                    tables.add(
+                                        getDatabaseAndTable(database, table));
+                                } else {
+                                    throw new IllegalArgumentException(
+                                        "Illegal sqlserver table-name: "
+                                            + table);
+                                }
+                            });
+                });
+        return tables;
+    }
+
+    private String getDatabaseAndTable(String database, String table) {
+        return String.format("%s.%s", database, table);
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/SqlServerDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/SqlServerDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..d5d492f7
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/SqlServerDataSourceConfigSwitcher.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+
+public class SqlServerDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+    private static final SqlServerDataSourceConfigSwitcher INSTANCE =
+        new SqlServerDataSourceConfigSwitcher();
+
+    public static final SqlServerDataSourceConfigSwitcher getInstance() {
+        return INSTANCE;
+    }
+
+    private SqlServerDataSourceConfigSwitcher() {
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/StarRocksDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/StarRocksDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..9cd11265
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/StarRocksDataSourceConfigSwitcher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class StarRocksDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+    private static final String TABLE = "table";
+
+    private static final String DATABASE = "database";
+
+    private StarRocksDataSourceConfigSwitcher() {
+    }
+
+    public static final StarRocksDataSourceConfigSwitcher INSTANCE =
+        new StarRocksDataSourceConfigSwitcher();
+
+    @Override
+    public FormStructure filterOptionRule(
+        String connectorName,
+        OptionRule dataSourceOptionRule,
+        OptionRule virtualTableOptionRule,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        OptionRule connectorOptionRule,
+        List<String> excludedKeys) {
+        if (PluginType.SOURCE.equals(pluginType)) {
+            throw new UnsupportedOperationException("Unsupported PluginType: " + pluginType);
+        } else if (PluginType.SINK.equals(pluginType)) {
+            excludedKeys.addAll(Arrays.asList(TABLE, DATABASE));
+        } else {
+            throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+        }
+        return super.filterOptionRule(
+            connectorName,
+            dataSourceOptionRule,
+            virtualTableOptionRule,
+            businessMode,
+            pluginType,
+            connectorOptionRule,
+            excludedKeys);
+    }
+
+    @Override
+    public Config mergeDatasourceConfig(
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig) {
+        if (PluginType.SOURCE.equals(pluginType)) {
+            throw new UnsupportedOperationException("Unsupported PluginType: " + pluginType);
+        } else if (PluginType.SINK.equals(pluginType)) {
+            if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+                // Add Table
+                connectorConfig =
+                    connectorConfig.withValue(
+                        TABLE,
+                        ConfigValueFactory.fromAnyRef(dataSourceOption.getTables().get(0)));
+            }
+            connectorConfig =
+                connectorConfig.withValue(
+                    DATABASE,
+                    ConfigValueFactory.fromAnyRef(dataSourceOption.getDatabases().get(0)));
+        } else {
+            throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+        }
+
+        return super.mergeDatasourceConfig(
+            dataSourceInstanceConfig,
+            virtualTableDetail,
+            dataSourceOption,
+            selectTableFields,
+            businessMode,
+            pluginType,
+            connectorConfig);
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/TidbDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/TidbDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..17dfcef6
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/TidbDataSourceConfigSwitcher.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+public class TidbDataSourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher {
+    public static final TidbDataSourceConfigSwitcher INSTANCE = new TidbDataSourceConfigSwitcher();
+
+    private static final String FACTORY = "factory";
+
+    private static final String CATALOG = "catalog";
+    private static final String USERNAME = "username";
+    private static final String PASSWORD = "password";
+    private static final String BASE_URL = "base-url";
+
+    private static final String TABLE_NAMES = "table-names";
+
+    private static final String DATABASE_NAMES = "database-names";
+
+    private TidbDataSourceConfigSwitcher() {
+    }
+
+    @Override
+    public Config mergeDatasourceConfig(
+        Config dataSourceInstanceConfig,
+        VirtualTableDetailRes virtualTableDetail,
+        DataSourceOption dataSourceOption,
+        SelectTableFields selectTableFields,
+        BusinessMode businessMode,
+        PluginType pluginType,
+        Config connectorConfig) {
+        // Add TiDB catalog options
+        Config config = ConfigFactory.empty();
+        config = config.withValue(FACTORY, ConfigValueFactory.fromAnyRef("TiDB"));
+        config =
+            config.withValue(
+                USERNAME,
+                ConfigValueFactory.fromAnyRef(dataSourceInstanceConfig.getString("user")));
+        config =
+            config.withValue(
+                PASSWORD,
+                ConfigValueFactory.fromAnyRef(
+                    dataSourceInstanceConfig.getString(PASSWORD)));
+        config =
+            config.withValue(
+                BASE_URL,
+                ConfigValueFactory.fromAnyRef(dataSourceInstanceConfig.getString("url")));
+        connectorConfig = connectorConfig.withValue(CATALOG, config.root());
+        return super.mergeDatasourceConfig(
+            dataSourceInstanceConfig,
+            virtualTableDetail,
+            dataSourceOption,
+            selectTableFields,
+            businessMode,
+            pluginType,
+            connectorConfig);
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/exceptions/UnSupportWrapperException.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/exceptions/UnSupportWrapperException.java
new file mode 100644
index 00000000..c761c9db
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/exceptions/UnSupportWrapperException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.exceptions;
+
+import lombok.NonNull;
+
+public class UnSupportWrapperException extends RuntimeException {
+    public UnSupportWrapperException(
+            @NonNull String formName, @NonNull String label, @NonNull String typeName) {
+        super(
+                String.format(
+                        "Form: %s, label: %s, typeName: %s not yet supported",
+                        formName, label, typeName));
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/FormOptionSort.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/FormOptionSort.java
new file mode 100644
index 00000000..63d69ddb
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/FormOptionSort.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.framework;
+
+import org.apache.seatunnel.app.dynamicforms.AbstractFormOption;
+import org.apache.seatunnel.app.dynamicforms.Constants;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FormOptionSort {
+    /**
+     * Sort form structure.
+     */
+    public static FormStructure sortFormStructure(@NonNull FormStructure formStructure) {
+        List<AbstractFormOption> newFormOptions = new ArrayList<>();
+        List<AbstractFormOption> formOptions = formStructure.getForms();
+        formOptions.forEach(currFormOption -> {
+            if (currFormOption.getShow() != null && currFormOption.getShow().size() > 0) {
+                return;
+            }
+            addShowOptionAfter(currFormOption, formOptions, newFormOptions);
+        });
+
+        return FormStructure.builder().name(formStructure.getName()).withLocale(formStructure.getLocales()).addFormOption(newFormOptions.toArray(new AbstractFormOption[0])).build();
+    }
+
+    public static void addShowOptionAfter(@NonNull AbstractFormOption currFormOption, @NonNull List<AbstractFormOption> allFormOptions, @NonNull List<AbstractFormOption> newFormOptions) {
+        if (newFormOptions.contains(currFormOption)) {
+            return;
+        }
+
+        newFormOptions.add(currFormOption);
+
+        List<AbstractFormOption> showOptions = allFormOptions.stream().filter(nextOption -> {
+            return nextOption.getShow() != null && nextOption.getShow().size() > 0 && nextOption.getShow().get(Constants.SHOW_FIELD).toString().equals(currFormOption.getField());
+        }).collect(Collectors.toList());
+
+        if (CollectionUtils.isEmpty(showOptions)) {
+            return;
+        }
+
+        for (AbstractFormOption showOption : showOptions) {
+            addShowOptionAfter(showOption, allFormOptions, newFormOptions);
+        }
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/PluginDiscoveryUtil.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/PluginDiscoveryUtil.java
new file mode 100644
index 00000000..9ef90143
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/PluginDiscoveryUtil.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.framework;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.app.domain.response.connector.ConnectorFeature;
+import org.apache.seatunnel.app.domain.response.connector.ConnectorInfo;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+
+import lombok.NonNull;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+public class PluginDiscoveryUtil {
+
+    public static List<ConnectorInfo> getAllConnectorsFromPluginMapping(PluginType pluginType) {
+        Map<PluginIdentifier, String> plugins = AbstractPluginDiscovery.getAllSupportedPlugins(pluginType);
+        List<ConnectorInfo> connectorInfos = new ArrayList<>();
+        plugins.forEach((plugin, artifactId) -> connectorInfos.add(new ConnectorInfo(plugin, artifactId)));
+        return connectorInfos;
+    }
+
+    public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(PluginType pluginType) throws IOException {
+        Common.setStarter(true);
+        if (!pluginType.equals(PluginType.SOURCE)) {
+            throw new UnsupportedOperationException("ONLY support plugin type source");
+        }
+        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
+        List<Factory> factories;
+        if (path.toFile().exists()) {
+            List<URL> files = FileUtils.searchJarFiles(path);
+            factories = FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));
+        } else {
+            factories = FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
+        }
+        Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
+        factories.forEach(plugin -> {
+            if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
+                TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
+                PluginIdentifier info = PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), plugin.factoryIdentifier());
+                featureMap.put(info, new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));
+            }
+        });
+        return featureMap;
+    }
+
+    public static List<ConnectorInfo> getDownloadedConnectors(@NonNull Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> allPlugins, @NonNull PluginType pluginType) throws IOException {
+        LinkedHashMap<PluginIdentifier, OptionRule> pluginIdentifierOptionRuleLinkedHashMap = allPlugins.get(pluginType);
+        if (pluginIdentifierOptionRuleLinkedHashMap == null) {
+            return new ArrayList<>();
+        }
+
+        List<ConnectorInfo> connectorInfos = new ArrayList<>();
+        pluginIdentifierOptionRuleLinkedHashMap.forEach((plugin, optionRule) -> connectorInfos.add(new ConnectorInfo(plugin, null)));
+        return connectorInfos;
+    }
+
+    public static List<ConnectorInfo> getTransforms(@NonNull Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> allPlugins) throws IOException {
+        LinkedHashMap<PluginIdentifier, OptionRule> pluginIdentifierOptionRuleLinkedHashMap = allPlugins.get(PluginType.TRANSFORM);
+        if (pluginIdentifierOptionRuleLinkedHashMap == null) {
+            return new ArrayList<>();
+        }
+        return pluginIdentifierOptionRuleLinkedHashMap.keySet().stream().map(t -> new ConnectorInfo(t, null)).collect(Collectors.toList());
+    }
+
+    public static Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> getAllConnectors() throws IOException {
+        return new SeaTunnelSinkPluginDiscovery().getAllPlugin();
+    }
+
+    public static ConcurrentMap<String, FormStructure> getDownloadedConnectorFormStructures(@NonNull Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> allPlugins, @NonNull PluginType pluginType) {
+        LinkedHashMap<PluginIdentifier, OptionRule> pluginIdentifierOptionRuleLinkedHashMap = allPlugins.get(pluginType);
+        ConcurrentMap<String, FormStructure> result = new ConcurrentHashMap<>();
+        if (pluginIdentifierOptionRuleLinkedHashMap == null) {
+            return result;
+        }
+
+        pluginIdentifierOptionRuleLinkedHashMap.forEach((key, value) -> result.put(key.getPluginName(), SeaTunnelOptionRuleWrapper.wrapper(value, key.getPluginName(), pluginType)));
+
+        return result;
+    }
+
+    public static ConcurrentMap<String, FormStructure> getTransformFormStructures(Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> allPlugins) {
+        return getDownloadedConnectorFormStructures(allPlugins, PluginType.TRANSFORM);
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/SeaTunnelOptionRuleWrapper.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/SeaTunnelOptionRuleWrapper.java
new file mode 100644
index 00000000..d54f3a4e
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/SeaTunnelOptionRuleWrapper.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.framework;
+
+import static org.apache.seatunnel.app.common.SeaTunnelConnectorI18n.CONNECTOR_I18N_CONFIG_EN;
+import static org.apache.seatunnel.app.common.SeaTunnelConnectorI18n.CONNECTOR_I18N_CONFIG_ZH;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.api.configuration.util.Expression;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import org.apache.seatunnel.app.dynamicforms.AbstractFormOption;
+import org.apache.seatunnel.app.dynamicforms.FormLocale;
+import org.apache.seatunnel.app.dynamicforms.FormOptionBuilder;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.dynamicforms.FormStructureBuilder;
+import org.apache.seatunnel.app.dynamicforms.validate.ValidateBuilder;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+public class SeaTunnelOptionRuleWrapper {
+    public static FormStructure wrapper(
+        @NonNull OptionRule optionRule,
+        @NonNull String connectorName,
+        @NonNull PluginType pluginType) {
+        return wrapper(
+            optionRule.getOptionalOptions(),
+            optionRule.getRequiredOptions(),
+            connectorName + "[" + pluginType.getType() + "]");
+    }
+
+    public static FormStructure wrapper(
+        @NonNull List<Option<?>> optionList,
+        @NonNull List<RequiredOption> requiredList,
+        @NonNull String connectorName,
+        @NonNull PluginType pluginType) {
+        return wrapper(optionList, requiredList, connectorName + "[" + pluginType.getType() + "]");
+    }
+
+    public static FormStructure wrapper(@NonNull OptionRule optionRule, @NonNull String name) {
+        return wrapper(optionRule.getOptionalOptions(), optionRule.getRequiredOptions(), name);
+    }
+
+    public static FormStructure wrapper(
+        @NonNull List<Option<?>> optionList,
+        @NonNull List<RequiredOption> requiredList,
+        @NonNull String name) {
+        FormLocale locale = new FormLocale();
+        List<AbstractFormOption> optionFormOptions = wrapperOptionOptions(name, optionList, locale);
+        List<List<AbstractFormOption>> requiredFormOptions =
+            wrapperRequiredOptions(name, requiredList, locale);
+
+        FormStructureBuilder formStructureBuilder = FormStructure.builder().name(name);
+
+        if (!CollectionUtils.isEmpty(requiredFormOptions)) {
+            requiredFormOptions.forEach(
+                list -> {
+                    if (CollectionUtils.isEmpty(list)) {
+                        return;
+                    }
+
+                    formStructureBuilder.addFormOption(list.toArray(new AbstractFormOption[1]));
+                });
+        }
+
+        if (!CollectionUtils.isEmpty(optionFormOptions)) {
+            formStructureBuilder.addFormOption(
+                optionFormOptions.toArray(new AbstractFormOption[1]));
+        }
+
+        formStructureBuilder.withLocale(locale);
+
+        return FormOptionSort.sortFormStructure(formStructureBuilder.build());
+    }
+
+    private static List<AbstractFormOption> wrapperOptionOptions(
+        @NonNull String connectorName, @NonNull List<Option<?>> optionList, FormLocale locale) {
+        return optionList.stream()
+            .map(
+                option -> {
+                    return wrapperToFormOption(connectorName, option, locale);
+                })
+            .collect(Collectors.toList());
+    }
+
+    private static List<List<AbstractFormOption>> wrapperRequiredOptions(
+        @NonNull String connectorName,
+        @NonNull List<RequiredOption> requiredList,
+        FormLocale locale) {
+        List<List<AbstractFormOption>> formOptionsList =
+            requiredList.stream()
+                .map(
+                    option -> {
+                        if (option instanceof RequiredOption.AbsolutelyRequiredOptions) {
+                            List<AbstractFormOption> collect =
+                                ((RequiredOption.AbsolutelyRequiredOptions) option)
+                                    .getRequiredOption().stream()
+                                    .map(
+                                        requiredOption -> {
+                                            return wrapperToFormOption(
+                                                connectorName,
+                                                requiredOption,
+                                                locale)
+                                                .withValidate(
+                                                    ValidateBuilder
+                                                        .builder()
+                                                        .nonEmptyValidateBuilder()
+                                                        .nonEmptyValidate());
+                                        })
+                                    .collect(Collectors.toList());
+                            return collect;
+                        }
+
+                        if (option instanceof RequiredOption.BundledRequiredOptions) {
+                            List<Option<?>> bundledRequiredOptions =
+                                ((RequiredOption.BundledRequiredOptions) option)
+                                    .getRequiredOption();
+                            List<String> bundledFields =
+                                bundledRequiredOptions.stream()
+                                    .map(requiredOption -> requiredOption.key())
+                                    .collect(Collectors.toList());
+
+                            List<AbstractFormOption> collect =
+                                bundledRequiredOptions.stream()
+                                    .map(
+                                        requiredOption -> {
+                                            AbstractFormOption
+                                                bundledRequiredFormOption =
+                                                wrapperToFormOption(
+                                                    connectorName,
+                                                    requiredOption,
+                                                    locale);
+                                            bundledRequiredFormOption
+                                                .withValidate(
+                                                    ValidateBuilder
+                                                        .builder()
+                                                        .unionNonEmptyValidateBuilder()
+                                                        .fields(
+                                                            bundledFields
+                                                                .toArray(
+                                                                    new String
+                                                                        [1]))
+                                                        .unionNonEmptyValidate());
+                                            return bundledRequiredFormOption;
+                                        })
+                                    .collect(Collectors.toList());
+                            return collect;
+                        }
+
+                        if (option instanceof RequiredOption.ExclusiveRequiredOptions) {
+                            List<Option<?>> exclusiveOptions =
+                                ((RequiredOption.ExclusiveRequiredOptions) option)
+                                    .getExclusiveOptions();
+                            List<String> exclusiveFields =
+                                exclusiveOptions.stream()
+                                    .map(requiredOption -> requiredOption.key())
+                                    .collect(Collectors.toList());
+
+                            List<AbstractFormOption> collect =
+                                exclusiveOptions.stream()
+                                    .map(
+                                        requiredOption -> {
+                                            AbstractFormOption
+                                                exclusiveRequiredFormOption =
+                                                wrapperToFormOption(
+                                                    connectorName,
+                                                    requiredOption,
+                                                    locale)
+                                                    .withValidate(
+                                                        ValidateBuilder
+                                                            .builder()
+                                                            .mutuallyExclusiveValidateBuilder()
+                                                            .fields(
+                                                                exclusiveFields
+                                                                    .toArray(
+                                                                        new String
+                                                                            [1]))
+                                                            .mutuallyExclusiveValidate());
+                                            return exclusiveRequiredFormOption;
+                                        })
+                                    .collect(Collectors.toList());
+                            return collect;
+                        }
+
+                        if (option instanceof RequiredOption.ConditionalRequiredOptions) {
+                            RequiredOption.ConditionalRequiredOptions
+                                conditionalRequiredOptions =
+                                (RequiredOption.ConditionalRequiredOptions)
+                                    option;
+
+                            // we only support one field to control a form option, so we
+                            // only need get condition key from the
+                            // first expression. And all expression is 'or' and every
+                            // condition have the same key.
+                            String conditionKey =
+                                conditionalRequiredOptions
+                                    .getExpression()
+                                    .getCondition()
+                                    .getOption()
+                                    .key();
+                            List<Object> expectValueList = new ArrayList<>();
+                            Expression expression =
+                                conditionalRequiredOptions.getExpression();
+                            expectValueList.add(
+                                expression
+                                    .getCondition()
+                                    .getExpectValue()
+                                    .toString());
+                            while (expression.hasNext()) {
+                                expression = expression.getNext();
+                                expectValueList.add(
+                                    expression
+                                        .getCondition()
+                                        .getExpectValue()
+                                        .toString());
+                            }
+                            List<AbstractFormOption> collect =
+                                conditionalRequiredOptions.getRequiredOption()
+                                    .stream()
+                                    .map(
+                                        requiredOption -> {
+                                            return wrapperToFormOption(
+                                                connectorName,
+                                                requiredOption,
+                                                locale)
+                                                .withShow(
+                                                    conditionKey,
+                                                    expectValueList)
+                                                .withValidate(
+                                                    ValidateBuilder
+                                                        .builder()
+                                                        .nonEmptyValidateBuilder()
+                                                        .nonEmptyValidate());
+                                        })
+                                    .collect(Collectors.toList());
+                            return collect;
+                        }
+
+                        throw new UnSupportWrapperException(
+                            connectorName, "Unknown", option.toString());
+                    })
+                .collect(Collectors.toList());
+
+        return formOptionsList;
+    }
+
+    private static AbstractFormOption wrapperToFormOption(
+        @NonNull String connectorName, @NonNull Option<?> option, @NonNull FormLocale locale) {
+        if (Boolean.class.equals(option.typeReference().getType())) {
+            return selectInput(
+                connectorName,
+                option,
+                Arrays.asList(
+                    new ImmutablePair("true", true), new ImmutablePair("false", false)),
+                locale);
+        }
+
+        if (Double.class.equals(option.typeReference().getType())
+            || Duration.class.equals(option.typeReference().getType())
+            || Float.class.equals(option.typeReference().getType())
+            || Integer.class.equals(option.typeReference().getType())
+            || Long.class.equals(option.typeReference().getType())
+            || String.class.equals(option.typeReference().getType())) {
+
+            if (option.key().toLowerCase(Locale.ROOT).equals("password")) {
+                return passwordInput(connectorName, option, locale);
+            }
+            if (option.defaultValue() != null && option.defaultValue().toString().contains("\n")) {
+                return textareaInput(connectorName, option, locale);
+            }
+
+            return textInput(connectorName, option, locale);
+        }
+
+        if (option.typeReference().getType().getTypeName().startsWith("java.util.List")
+            || option.typeReference().getType().getTypeName().startsWith("java.util.Map")
+            || option.typeReference()
+            .getType()
+            .getTypeName()
+            .startsWith("org.apache.seatunnel.api.configuration.Options")) {
+
+            return textareaInput(connectorName, option, locale);
+        }
+
+        if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) {
+            List<?> optionValues = ((SingleChoiceOption<?>) option).getOptionValues();
+            List<ImmutablePair> staticSelectOptions =
+                optionValues.stream()
+                    .map(o -> new ImmutablePair(o.toString(), o.toString()))
+                    .collect(Collectors.toList());
+            return selectInput(connectorName, option, staticSelectOptions, locale);
+        }
+
+        if (((Class) option.typeReference().getType()).isEnum()) {
+            Object[] enumConstants = ((Class) option.typeReference().getType()).getEnumConstants();
+            List<ImmutablePair> staticSelectOptions =
+                Arrays.stream(enumConstants)
+                    .map(o -> new ImmutablePair(o.toString(), o.toString()))
+                    .collect(Collectors.toList());
+            return selectInput(connectorName, option, staticSelectOptions, locale);
+        }
+
+        if (((Class) option.typeReference().getType())
+            .getTypeName()
+            .startsWith("org.apache.seatunnel")) {
+            return textareaInput(connectorName, option, locale);
+        }
+
+        // object type and map type will show as textarea
+        throw new UnSupportWrapperException(
+            connectorName, option.key(), option.typeReference().getType().getTypeName());
+    }
+
+    private static boolean enableLabelI18n(
+        String connectorName, String optionI18nKey, FormLocale locale) {
+        String realConnectorName = connectorName;
+        if (connectorName.contains("[")) {
+            realConnectorName = connectorName.substring(0, connectorName.indexOf("["));
+        }
+
+        boolean labelEnableI18n = false;
+        if (CONNECTOR_I18N_CONFIG_ZH.hasPath(realConnectorName)
+            && CONNECTOR_I18N_CONFIG_ZH.getConfig(realConnectorName).hasPath(optionI18nKey)) {
+            locale.addZhCN(
+                optionI18nKey,
+                CONNECTOR_I18N_CONFIG_ZH.getConfig(realConnectorName).getString(optionI18nKey));
+            labelEnableI18n = true;
+        }
+
+        if (CONNECTOR_I18N_CONFIG_EN.hasPath(realConnectorName)
+            && CONNECTOR_I18N_CONFIG_EN.getConfig(realConnectorName).hasPath(optionI18nKey)) {
+            locale.addEnUS(
+                optionI18nKey,
+                CONNECTOR_I18N_CONFIG_EN.getConfig(realConnectorName).getString(optionI18nKey));
+            labelEnableI18n = true;
+        }
+        return labelEnableI18n;
+    }
+
+    private static AbstractFormOption selectInput(
+        String connectorName,
+        @NonNull Option<?> option,
+        List<ImmutablePair> staticSelectOptions,
+        FormLocale locale) {
+        FormOptionBuilder builder = FormOptionBuilder.builder();
+        String i18nOptionKey = option.key().replace(".", "_").replace("-", "_");
+        if (enableLabelI18n(connectorName, i18nOptionKey, locale)) {
+            builder = builder.withI18nLabel(i18nOptionKey);
+        } else {
+            builder = builder.withLabel(option.key());
+        }
+
+        FormOptionBuilder.StaticSelectOptionBuilder staticSelectOptionBuilder =
+            builder.withField(option.key()).staticSelectOptionBuilder();
+
+        for (ImmutablePair selectOption : staticSelectOptions) {
+            if (enableLabelI18n(connectorName, selectOption.getLeft().toString(), locale)) {
+                staticSelectOptionBuilder.addI18nSelectOptions(selectOption);
+            } else {
+                staticSelectOptionBuilder.addSelectOptions(selectOption);
+            }
+        }
+
+        AbstractFormOption abstractFormOption =
+            staticSelectOptionBuilder
+                .formStaticSelectOption()
+                .withDefaultValue(
+                    option.defaultValue() == null ? null
+                        : option.defaultValue().toString());
+
+        String placeholderI18nOptionKey = i18nOptionKey + "_description";
+        if (enableLabelI18n(connectorName, placeholderI18nOptionKey, locale)) {
+            abstractFormOption = abstractFormOption.withI18nPlaceholder(placeholderI18nOptionKey);
+        } else {
+            abstractFormOption = abstractFormOption.withPlaceholder(option.getDescription());
+        }
+
+        return abstractFormOption;
+    }
+
+    private static AbstractFormOption passwordInput(
+        String connectorName, @NonNull Option<?> option, FormLocale locale) {
+        FormOptionBuilder builder = FormOptionBuilder.builder();
+        String i18nOptionKey = option.key().replace(".", "_").replace("-", "_");
+        if (enableLabelI18n(connectorName, i18nOptionKey, locale)) {
+            builder = builder.withI18nLabel(i18nOptionKey);
+        } else {
+            builder = builder.withLabel(option.key());
+        }
+
+        AbstractFormOption abstractFormOption =
+            builder.withField(option.key())
+                .inputOptionBuilder()
+                .formPasswordInputOption()
+                .withDefaultValue(option.defaultValue());
+
+        String placeholderI18nOptionKey = i18nOptionKey + "_description";
+        if (enableLabelI18n(connectorName, placeholderI18nOptionKey, locale)) {
+            abstractFormOption = abstractFormOption.withI18nPlaceholder(placeholderI18nOptionKey);
+        } else {
+            abstractFormOption = abstractFormOption.withPlaceholder(option.getDescription());
+        }
+
+        return abstractFormOption;
+    }
+
+    private static AbstractFormOption textInput(
+        String connectorName, @NonNull Option<?> option, FormLocale locale) {
+        FormOptionBuilder builder = FormOptionBuilder.builder();
+        String i18nOptionKey = option.key().replace(".", "_").replace("-", "_");
+        String placeholderI18nOptionKey = i18nOptionKey + "_description";
+        if (enableLabelI18n(connectorName, i18nOptionKey, locale)) {
+            builder = builder.withI18nLabel(i18nOptionKey);
+        } else {
+            builder = builder.withLabel(option.key());
+        }
+
+        AbstractFormOption abstractFormOption =
+            builder.withField(option.key())
+                .inputOptionBuilder()
+                .formTextInputOption()
+                .withDefaultValue(option.defaultValue());
+        if (enableLabelI18n(connectorName, placeholderI18nOptionKey, locale)) {
+            abstractFormOption = abstractFormOption.withI18nPlaceholder(placeholderI18nOptionKey);
+        } else {
+            abstractFormOption = abstractFormOption.withPlaceholder(option.getDescription());
+        }
+
+        return abstractFormOption;
+    }
+
+    private static AbstractFormOption textareaInput(
+        String connectorName, @NonNull Option<?> option, FormLocale locale) {
+        FormOptionBuilder builder = FormOptionBuilder.builder();
+        String i18nOptionKey = option.key().replace(".", "_").replace("-", "_");
+        String placeholderI18nOptionKey = i18nOptionKey + "_description";
+
+        if (enableLabelI18n(connectorName, i18nOptionKey, locale)) {
+            builder = builder.withI18nLabel(i18nOptionKey);
+        } else {
+            builder = builder.withLabel(option.key());
+        }
+
+        AbstractFormOption abstractFormOption =
+            builder.withField(option.key())
+                .inputOptionBuilder()
+                .formTextareaInputOption()
+                .withClearable()
+                .withDefaultValue(option.defaultValue());
+        if (enableLabelI18n(connectorName, placeholderI18nOptionKey, locale)) {
+            abstractFormOption = abstractFormOption.withI18nPlaceholder(placeholderI18nOptionKey);
+        } else {
+            abstractFormOption = abstractFormOption.withPlaceholder(option.getDescription());
+        }
+
+        return abstractFormOption;
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/UnSupportWrapperException.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/UnSupportWrapperException.java
new file mode 100644
index 00000000..8db5be03
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/UnSupportWrapperException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.framework;
+
+import lombok.NonNull;
+
+public class UnSupportWrapperException extends RuntimeException {
+    public UnSupportWrapperException(
+            @NonNull String formName, @NonNull String label, @NonNull String typeName) {
+        super(
+                String.format(
+                        "Form: %s, label: %s, typeName: %s not yet supported",
+                        formName, label, typeName));
+    }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JdbcUtils.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JdbcUtils.java
new file mode 100644
index 00000000..56e1af59
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JdbcUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.utils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.regex.Pattern;
+
+public class JdbcUtils {
+
+    /**
+     * Get the host from the jdbc url.
+     *
+     * @param url jdbc:clickhouse://localhost:8123
+     * @return localhost:8123
+     */
+    public static String getAddressFromUrl(String url) {
+        checkNotNull(url, "url can not be null");
+        Pattern pattern = Pattern.compile("jdbc:clickhouse:\\/\\/(.*):([0-9]+)(.*)");
+        return pattern.matcher(url).replaceAll("$1:$2");
+    }
+
+    public static String replaceDatabase(String jdbcUrl, String databaseName) {
+        if (databaseName == null) {
+            return jdbcUrl;
+        }
+        String[] split = jdbcUrl.split("\\?");
+        if (split.length == 1) {
+            return replaceDatabaseWithoutParameter(jdbcUrl, databaseName);
+        }
+        return replaceDatabaseWithoutParameter(split[0], databaseName) + "?" + split[1];
+    }
+
+    private static String replaceDatabaseWithoutParameter(String jdbcUrl, String databaseName) {
+        int lastIndex = jdbcUrl.lastIndexOf(':');
+        char[] chars = jdbcUrl.toCharArray();
+        for (int i = lastIndex + 1; i < chars.length; i++) {
+            if (chars[i] == '/') {
+                return jdbcUrl.substring(0, i + 1) + databaseName;
+            }
+        }
+        return jdbcUrl + "/" + databaseName;
+    }
+}
diff --git a/seatunnel-web-dist/release-docs/LICENSE b/seatunnel-web-dist/release-docs/LICENSE
index ad7f6d8c..a4338a4b 100644
--- a/seatunnel-web-dist/release-docs/LICENSE
+++ b/seatunnel-web-dist/release-docs/LICENSE
@@ -222,6 +222,7 @@ The text of each license is the standard Apache 2.0 license.
      (Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/)
      (Apache License, Version 2.0) Apache Commons Collections (org.apache.commons:commons-collections4:4.4 - https://commons.apache.org/proper/commons-collections/)
      (Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.12.0 - http://commons.apache.org/proper/commons-lang/)
+     (Apache License, Version 2.0) Apache Commons IO (commons-io:commons-io:2.11.0 - https://mvnrepository.com/artifact/commons-io/commons-io/2.11.0)
      (Apache License, Version 2.0) config (com.typesafe:config:1.3.3 - https://github.com/lightbend/config)
      (The Apache Software License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:19.0 - https://github.com/google/guava/guava)
      (Apache License 2.0) Hibernate Validator Engine (org.hibernate.validator:hibernate-validator:6.2.2.Final - http://hibernate.org/validator/hibernate-validator)
diff --git a/seatunnel-web-dist/src/main/assembly/seatunnel-web-ci.xml b/seatunnel-web-dist/src/main/assembly/seatunnel-web-ci.xml
index a13174fc..2260084f 100644
--- a/seatunnel-web-dist/src/main/assembly/seatunnel-web-ci.xml
+++ b/seatunnel-web-dist/src/main/assembly/seatunnel-web-ci.xml
@@ -86,6 +86,19 @@
             <scope>provided</scope>
         </dependencySet>
 
+        <dependencySet>
+            <useProjectArtifact>false</useProjectArtifact>
+            <useTransitiveDependencies>true</useTransitiveDependencies>
+            <unpack>false</unpack>
+            <includes>
+                <include>*:*:jar</include>
+            </includes>
+            <excludes>
+                <exclude>org.apache.seatunnel:datasource-*:jar</exclude>
+            </excludes>
+            <outputDirectory>/libs</outputDirectory>
+        </dependencySet>
+
         <!-- ============ Logging Jars ============  -->
         <dependencySet>
             <useProjectArtifact>false</useProjectArtifact>
diff --git a/seatunnel-web-dist/src/main/assembly/seatunnel-web.xml b/seatunnel-web-dist/src/main/assembly/seatunnel-web.xml
index 5cc991a4..4a726e68 100644
--- a/seatunnel-web-dist/src/main/assembly/seatunnel-web.xml
+++ b/seatunnel-web-dist/src/main/assembly/seatunnel-web.xml
@@ -86,6 +86,19 @@
             <scope>provided</scope>
         </dependencySet>
 
+        <dependencySet>
+            <useProjectArtifact>false</useProjectArtifact>
+            <useTransitiveDependencies>true</useTransitiveDependencies>
+            <unpack>false</unpack>
+            <includes>
+                <include>*:*:jar</include>
+            </includes>
+            <excludes>
+                <exclude>org.apache.seatunnel:datasource-*:jar</exclude>
+            </excludes>
+            <outputDirectory>/libs</outputDirectory>
+        </dependencySet>
+
         <!-- ============ Logging Jars ============  -->
         <dependencySet>
             <useProjectArtifact>false</useProjectArtifact>
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 6b69a0f1..0f6896f7 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -4,8 +4,10 @@ classmate-1.3.1.jar
 commons-collections4-4.4.jar
 commons-codec-1.13.jar
 commons-lang3-3.12.0.jar
+commons-io-2.11.0.jar
 config-1.3.3.jar
 cron-utils-9.1.6.jar
+gson-2.8.6.jar
 guava-19.0.jar
 hibernate-validator-6.2.2.Final.jar
 jackson-annotations-2.12.7.jar
@@ -53,6 +55,8 @@ seatunnel-common-2.3.1.jar
 seatunnel-config-base-2.3.1.jar
 seatunnel-config-shade-2.3.1.jar
 seatunnel-jackson-2.3.1-optional.jar
+seatunnel-api-2.3.1.jar
+seatunnel-plugin-discovery-2.3.1.jar
 slf4j-api-1.7.25.jar
 snakeyaml-1.29.jar
 spring-aop-5.3.20.jar