You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/05/16 03:55:37 UTC
[incubator-seatunnel-web] branch add_canvas_job_define updated: Add datasource config switcher (#55)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch add_canvas_job_define
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel-web.git
The following commit(s) were added to refs/heads/add_canvas_job_define by this push:
new e0b6e75d Add datasource config switcher (#55)
e0b6e75d is described below
commit e0b6e75d45fe6246843d8f50fe9aff77fb30371d
Author: Eric <ga...@gmail.com>
AuthorDate: Tue May 16 11:55:31 2023 +0800
Add datasource config switcher (#55)
---
.licenserc.yaml | 1 +
pom.xml | 194 ++++++++-
.../datasource-s3/pom.xml | 21 -
.../seatunnel-datasource-plugins/pom.xml | 15 +
seatunnel-server/pom.xml | 116 -----
seatunnel-server/seatunnel-app/pom.xml | 43 +-
.../app/common/SeaTunnelConnectorI18n.java | 44 ++
.../app/domain/request/connector/BusinessMode.java | 24 +
.../app/domain/request/job/DataSourceOption.java | 34 ++
.../app/domain/request/job/SelectTableFields.java | 35 ++
.../seatunnel/app/domain/response/BaseInfo.java | 34 ++
.../response/connector/ConnectorFeature.java | 28 ++
.../domain/response/connector/ConnectorInfo.java | 30 ++
.../domain/response/connector/DataSourceInfo.java | 30 ++
.../response/connector/DataSourceInstance.java | 32 ++
.../response/datasource/DatasourceDetailRes.java | 48 ++
.../domain/response/datasource/DatasourceRes.java | 44 ++
.../response/datasource/VirtualTableDetailRes.java | 58 +++
.../response/datasource/VirtualTableFieldRes.java | 54 +++
.../response/datasource/VirtualTableRes.java | 44 ++
.../AbstractDataSourceConfigSwitcher.java | 163 +++++++
.../datasource/DataSourceClientFactory.java | 39 ++
.../datasource/DataSourceConfigSwitcher.java | 57 +++
.../datasource/DataSourceConfigSwitcherUtils.java | 135 ++++++
.../app/thridparty/datasource/SchemaGenerator.java | 90 ++++
.../impl/BaseJdbcDataSourceConfigSwitcher.java | 186 ++++++++
.../impl/ClickhouseDataSourceConfigSwitcher.java | 152 +++++++
.../ElasticSearchDataSourceConfigSwitcher.java | 125 ++++++
.../impl/KafkaDataSourceConfigSwitcher.java | 129 ++++++
.../impl/MysqlCDCDataSourceConfigSwitcher.java | 179 ++++++++
.../impl/MysqlDatasourceConfigSwitcher.java | 25 ++
.../impl/OracleDataSourceConfigSwitcher.java | 67 +++
.../impl/PostgresCDCDataSourceConfigSwitcher.java | 185 ++++++++
.../impl/PostgresqlDataSourceConfigSwitcher.java | 60 +++
.../impl/RedshiftDataSourceConfigSwitcher.java | 58 +++
.../impl/S3DataSourceConfigSwitcher.java | 185 ++++++++
.../impl/S3RedshiftDataSourceConfigSwitcher.java | 84 ++++
.../impl/SqlServerCDCDataSourceConfigSwitcher.java | 185 ++++++++
.../impl/SqlServerDataSourceConfigSwitcher.java | 33 ++
.../impl/StarRocksDataSourceConfigSwitcher.java | 109 +++++
.../impl/TidbDataSourceConfigSwitcher.java | 82 ++++
.../exceptions/UnSupportWrapperException.java | 30 ++
.../app/thridparty/framework/FormOptionSort.java | 67 +++
.../thridparty/framework/PluginDiscoveryUtil.java | 120 +++++
.../framework/SeaTunnelOptionRuleWrapper.java | 484 +++++++++++++++++++++
.../framework/UnSupportWrapperException.java | 30 ++
.../org/apache/seatunnel/app/utils/JdbcUtils.java | 59 +++
seatunnel-web-dist/release-docs/LICENSE | 1 +
.../src/main/assembly/seatunnel-web-ci.xml | 13 +
.../src/main/assembly/seatunnel-web.xml | 13 +
tools/dependencies/known-dependencies.txt | 4 +
51 files changed, 3929 insertions(+), 149 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index ec23eba6..052f0bc1 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -41,5 +41,6 @@ header:
- '**/.gitkeep'
- '**/com/typesafe/config/**'
- 'seatunnel-main-repository/**'
+ - 'seatunnel-web-dist/release-docs/**'
comment: on-failure
diff --git a/pom.xml b/pom.xml
index b2a4da10..c015a38d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,7 +106,8 @@
<!-- dependency -->
<commons.logging.version>1.2</commons.logging.version>
<slf4j.version>1.7.25</slf4j.version>
- <jackson.version>2.12.7.1</jackson.version>
+ <jackson-databind.version>2.12.7.1</jackson-databind.version>
+ <jackson.version>2.12.7</jackson.version>
<mysql.version>8.0.16</mysql.version>
<h2.version>2.1.214</h2.version>
<junit.version>5.9.0</junit.version>
@@ -125,6 +126,23 @@
<e2e.dependency.skip>true</e2e.dependency.skip>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<flatten-maven-plugin.version>1.3.0</flatten-maven-plugin.version>
+
+ <spring-boot.version>2.6.8</spring-boot.version>
+ <spring.version>5.3.20</spring.version>
+ <mybatis-plus-boot-starter.version>3.5.3.1</mybatis-plus-boot-starter.version>
+ <druid-spring-boot-starter.version>1.2.9</druid-spring-boot-starter.version>
+ <springfox-swagger.version>2.6.1</springfox-swagger.version>
+ <swagger-annotations.version>1.5.10</swagger-annotations.version>
+ <hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
+ <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
+ <jsoup.version>1.14.3</jsoup.version>
+ <jwt.version>0.10.7</jwt.version>
+ <cron-utils.version>9.1.6</cron-utils.version>
+ <commons-io.version>2.11.0</commons-io.version>
+
+ <hadoop-uber.version>2.3.1</hadoop-uber.version>
+ <hadoop-aws.version>3.1.4</hadoop-aws.version>
+ <aws-java-sdk-bundle.version>1.11.271</aws-java-sdk-bundle.version>
</properties>
<dependencyManagement>
@@ -240,6 +258,28 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-plugin-discovery</artifactId>
<version>${seatunnel-framework.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-1.2-api</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-api</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-core</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -255,6 +295,109 @@
<scope>provided</scope>
</dependency>
+ <!-- springboot and other dependencies -->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ <version>${spring-boot.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-jetty</artifactId>
+ <version>${spring-boot.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-aop</artifactId>
+ <version>${spring-boot.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-jdbc</artifactId>
+ <version>${spring-boot.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <version>${spring-boot.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.6</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>druid-spring-boot-starter</artifactId>
+ <version>${druid-spring-boot-starter.version}</version>
+ </dependency>
+
+ <!-- ORM -->
+ <dependency>
+ <groupId>com.baomidou</groupId>
+ <artifactId>mybatis-plus-boot-starter</artifactId>
+ <version>${mybatis-plus-boot-starter.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-jdbc</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>spring-boot-autoconfigure</artifactId>
+ <groupId>org.springframework.boot</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.hibernate.validator</groupId>
+ <artifactId>hibernate-validator</artifactId>
+ <version>${hibernate.validator.version}</version>
+ </dependency>
+
+ <!-- swagger -->
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-swagger2</artifactId>
+ <version>${springfox-swagger.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-swagger-ui</artifactId>
+ <version>${springfox-swagger.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.swagger</groupId>
+ <artifactId>swagger-annotations</artifactId>
+ <version>${swagger-annotations.version}</version>
+ </dependency>
+
+ <!-- JWT -->
+ <dependency>
+ <groupId>io.jsonwebtoken</groupId>
+ <artifactId>jjwt-api</artifactId>
+ <version>${jwt.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.jsonwebtoken</groupId>
+ <artifactId>jjwt-impl</artifactId>
+ <version>${jwt.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.jsonwebtoken</groupId>
+ <artifactId>jjwt-jackson</artifactId>
+ <version>${jwt.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <!-- http -->
+ <dependency>
+ <groupId>org.jsoup</groupId>
+ <artifactId>jsoup</artifactId>
+ <version>${jsoup.version}</version>
+ </dependency>
+
<!-- seatunnel connector test -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
@@ -296,6 +439,11 @@
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${commons-io.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
@@ -312,7 +460,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
+ <version>${jackson-databind.version}</version>
</dependency>
<dependency>
@@ -350,6 +498,48 @@
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
+ <scope>provided</scope>
+ <version>${hadoop-uber.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop-aws.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-bundle</artifactId>
+ <version>${aws-java-sdk-bundle.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.cronutils</groupId>
+ <artifactId>cron-utils</artifactId>
+ <version>${cron-utils.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>javassist</artifactId>
+ <groupId>org.javassist</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-s3/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-s3/pom.xml
index be117f58..59b39e58 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-s3/pom.xml
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-s3/pom.xml
@@ -24,12 +24,6 @@
<artifactId>datasource-s3</artifactId>
- <properties>
- <hadoop-uber.version>2.4.4-WS-SNAPSHOT</hadoop-uber.version>
- <hadoop-aws.version>3.1.4</hadoop-aws.version>
- <aws-java-sdk-bundle.version>1.11.271</aws-java-sdk-bundle.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
@@ -39,29 +33,14 @@
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
- <version>${hadoop-uber.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
- <version>${hadoop-aws.version}</version>
- <exclusions>
- <exclusion>
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
- <version>${aws-java-sdk-bundle.version}</version>
</dependency>
</dependencies>
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
index 039df6b4..91cdc740 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
@@ -1,4 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
diff --git a/seatunnel-server/pom.xml b/seatunnel-server/pom.xml
index 7f1e1def..7bfbf9d6 100644
--- a/seatunnel-server/pom.xml
+++ b/seatunnel-server/pom.xml
@@ -32,120 +32,4 @@
<module>seatunnel-scheduler</module>
<module>seatunnel-server-common</module>
</modules>
-
- <properties>
- <spring-boot.version>2.6.8</spring-boot.version>
- <spring.version>5.3.20</spring.version>
- <mybatis-plus-boot-starter.version>3.5.3.1</mybatis-plus-boot-starter.version>
- <druid-spring-boot-starter.version>1.2.9</druid-spring-boot-starter.version>
- <springfox-swagger.version>2.6.1</springfox-swagger.version>
- <swagger-annotations.version>1.5.10</swagger-annotations.version>
- <hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
- <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
- <jsoup.version>1.14.3</jsoup.version>
- <jwt.version>0.10.7</jwt.version>
- <cron-utils.version>9.1.6</cron-utils.version>
- </properties>
-
- <dependencyManagement>
- <dependencies>
- <!-- springboot -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- <version>${spring-boot.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-jetty</artifactId>
- <version>${spring-boot.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-aop</artifactId>
- <version>${spring-boot.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-jdbc</artifactId>
- <version>${spring-boot.version}</version>
- </dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>2.8.6</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>druid-spring-boot-starter</artifactId>
- <version>${druid-spring-boot-starter.version}</version>
- </dependency>
-
- <!-- ORM -->
- <dependency>
- <groupId>com.baomidou</groupId>
- <artifactId>mybatis-plus-boot-starter</artifactId>
- <version>${mybatis-plus-boot-starter.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-jdbc</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>spring-boot-autoconfigure</artifactId>
- <groupId>org.springframework.boot</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.hibernate.validator</groupId>
- <artifactId>hibernate-validator</artifactId>
- <version>${hibernate.validator.version}</version>
- </dependency>
-
- <!-- swagger -->
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger2</artifactId>
- <version>${springfox-swagger.version}</version>
- </dependency>
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger-ui</artifactId>
- <version>${springfox-swagger.version}</version>
- </dependency>
- <dependency>
- <groupId>io.swagger</groupId>
- <artifactId>swagger-annotations</artifactId>
- <version>${swagger-annotations.version}</version>
- </dependency>
-
- <!-- JWT -->
- <dependency>
- <groupId>io.jsonwebtoken</groupId>
- <artifactId>jjwt-api</artifactId>
- <version>${jwt.version}</version>
- </dependency>
- <dependency>
- <groupId>io.jsonwebtoken</groupId>
- <artifactId>jjwt-impl</artifactId>
- <version>${jwt.version}</version>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>io.jsonwebtoken</groupId>
- <artifactId>jjwt-jackson</artifactId>
- <version>${jwt.version}</version>
- <scope>runtime</scope>
- </dependency>
-
- <!-- http -->
- <dependency>
- <groupId>org.jsoup</groupId>
- <artifactId>jsoup</artifactId>
- <version>${jsoup.version}</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
</project>
\ No newline at end of file
diff --git a/seatunnel-server/seatunnel-app/pom.xml b/seatunnel-server/seatunnel-app/pom.xml
index 7458e589..ca908b06 100644
--- a/seatunnel-server/seatunnel-app/pom.xml
+++ b/seatunnel-server/seatunnel-app/pom.xml
@@ -30,11 +30,19 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-dynamicform</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-plugin-discovery</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-scheduler-dolphinscheduler</artifactId>
@@ -45,6 +53,26 @@
<artifactId>seatunnel-server-common</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <!-- datasource -->
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-datasource-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>datasource-plugins-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>datasource-s3</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!--springboot-->
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -118,13 +146,11 @@
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
- <version>${springfox-swagger.version}</version>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
- <version>${swagger-annotations.version}</version>
</dependency>
<dependency>
@@ -183,7 +209,6 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
- <version>${spring-boot.version}</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -204,17 +229,15 @@
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
<!-- https://mvnrepository.com/artifact/com.cronutils/cron-utils -->
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
- <version>${cron-utils.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>javassist</artifactId>
- <groupId>org.javassist</groupId>
- </exclusion>
- </exclusions>
</dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/SeaTunnelConnectorI18n.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/SeaTunnelConnectorI18n.java
new file mode 100644
index 00000000..96e4780b
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/common/SeaTunnelConnectorI18n.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.common;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.commons.io.IOUtils;
+
+import java.nio.charset.StandardCharsets;
+
+public class SeaTunnelConnectorI18n {
+ public static Config CONNECTOR_I18N_CONFIG_EN;
+ public static Config CONNECTOR_I18N_CONFIG_ZH;
+
+ static {
+ try {
+ CONNECTOR_I18N_CONFIG_EN = ConfigFactory.parseString(IOUtils.toString(SeaTunnelConnectorI18n.class.getResourceAsStream("/i18n_en.config"), StandardCharsets.UTF_8));
+ CONNECTOR_I18N_CONFIG_ZH =
+ ConfigFactory.parseString(
+ IOUtils.toString(
+ SeaTunnelConnectorI18n.class.getResourceAsStream(
+ "/i18n_zh.config"),
+ StandardCharsets.UTF_8));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/connector/BusinessMode.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/connector/BusinessMode.java
new file mode 100644
index 00000000..73c906c5
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/connector/BusinessMode.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.request.connector;
+
+public enum BusinessMode {
+ DATA_REPLICA,
+
+ DATA_INTEGRATION;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/DataSourceOption.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/DataSourceOption.java
new file mode 100644
index 00000000..0cc8ebd9
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/DataSourceOption.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.request.job;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class DataSourceOption {
+
+ private List<String> databases;
+
+ private List<String> tables;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/SelectTableFields.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/SelectTableFields.java
new file mode 100644
index 00000000..6c4cd0a0
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/SelectTableFields.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.request.job;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SelectTableFields {
+
+ private boolean all;
+ private List<String> tableFields;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/BaseInfo.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/BaseInfo.java
new file mode 100644
index 00000000..3fd2dd60
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/BaseInfo.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response;
+
+import lombok.Data;
+
+import java.util.Date;
+
+@Data
+public class BaseInfo {
+
+ private String createUserName;
+
+ private Date createTime;
+
+ private String updateUserName;
+
+ private Date updateTime;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/ConnectorFeature.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/ConnectorFeature.java
new file mode 100644
index 00000000..a77e95d3
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/ConnectorFeature.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.connector;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class ConnectorFeature {
+
+ private boolean supportColumnProjection;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/ConnectorInfo.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/ConnectorInfo.java
new file mode 100644
index 00000000..615b2bf0
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/ConnectorInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.connector;
+
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class ConnectorInfo {
+ private PluginIdentifier pluginIdentifier;
+ private String artifactId;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/DataSourceInfo.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/DataSourceInfo.java
new file mode 100644
index 00000000..c8cdc912
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/DataSourceInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.connector;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class DataSourceInfo {
+
+ private ConnectorInfo connectorInfo;
+
+ private String datasourceName;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/DataSourceInstance.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/DataSourceInstance.java
new file mode 100644
index 00000000..963056dc
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/connector/DataSourceInstance.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.connector;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class DataSourceInstance {
+
+ private DataSourceInfo dataSourceInfo;
+
+ private String dataSourceInstanceName;
+
+ private Long dataSourceInstanceId;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/DatasourceDetailRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/DatasourceDetailRes.java
new file mode 100644
index 00000000..4298f4bc
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/DatasourceDetailRes.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.datasource;
+
+import lombok.Data;
+
+import java.util.Date;
+import java.util.Map;
+
+@Data
+public class DatasourceDetailRes {
+
+ private String id;
+
+ private String datasourceName;
+
+ private String pluginName;
+
+ private String pluginVersion;
+
+ // todo check configuration
+ private Map<String, String> datasourceConfig;
+
+ private String description;
+
+ private String createUserName;
+
+ private String updateUserName;
+
+ private Date createTime;
+
+ private Date updateTime;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/DatasourceRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/DatasourceRes.java
new file mode 100644
index 00000000..b702a124
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/DatasourceRes.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.datasource;
+
+import org.apache.seatunnel.app.domain.response.BaseInfo;
+
+import lombok.Data;
+
+import java.util.Map;
+
+@Data
+public class DatasourceRes extends BaseInfo {
+
+ private String id;
+
+ private String datasourceName;
+
+ private String pluginName;
+
+ private String pluginVersion;
+
+ private String description;
+
+ private Map<String, String> datasourceConfig;
+
+ private int createUserId;
+
+ private int updateUserId;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableDetailRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableDetailRes.java
new file mode 100644
index 00000000..dba9dc6f
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableDetailRes.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.datasource;
+
+import org.apache.seatunnel.app.domain.response.BaseInfo;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+@ApiModel(value = "VirtualTable Detail Response", description = "virtual table detail info")
+public class VirtualTableDetailRes extends BaseInfo {
+
+ @ApiModelProperty(value = "table id", required = true, dataType = "String")
+ private String tableId;
+
+ @ApiModelProperty(value = "datasource id", required = true, dataType = "String")
+ private String datasourceId;
+
+ @ApiModelProperty(value = "datasource name", required = true, dataType = "String")
+ private String datasourceName;
+
+ @ApiModelProperty(value = "database name", required = true, dataType = "String")
+ private String databaseName;
+
+ @ApiModelProperty(value = "plugin name", required = true, dataType = "String")
+ private String pluginName;
+
+ @ApiModelProperty(value = "table name", required = true, dataType = "String")
+ private String tableName;
+
+ @ApiModelProperty(value = "table description", dataType = "String")
+ private String description;
+
+ @ApiModelProperty(value = "table properties", dataType = "List")
+ private List<VirtualTableFieldRes> fields;
+
+ private Map<String, String> datasourceProperties;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableFieldRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableFieldRes.java
new file mode 100644
index 00000000..09342dec
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableFieldRes.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.datasource;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@ApiModel(value = "VirtualTable Field Response", description = "virtual table field info")
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class VirtualTableFieldRes {
+
+ @ApiModelProperty(value = "field name", required = true, dataType = "String")
+ private String fieldName;
+
+ @ApiModelProperty(value = "field type", required = true, dataType = "String")
+ private String fieldType;
+
+ @ApiModelProperty(value = "field length", required = true, dataType = "Boolean")
+ private Boolean nullable;
+
+ @ApiModelProperty(value = "field default value", dataType = "String")
+ private String defaultValue;
+
+ @ApiModelProperty(value = "primary key", dataType = "Boolean")
+ private Boolean primaryKey;
+
+ @ApiModelProperty(value = "field comment", dataType = "String")
+ private String fieldComment;
+
+ @ApiModelProperty(value = "field extra", dataType = "String")
+ private String fieldExtra;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableRes.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableRes.java
new file mode 100644
index 00000000..9d56bfbf
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/response/datasource/VirtualTableRes.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.domain.response.datasource;
+
+import org.apache.seatunnel.app.domain.response.BaseInfo;
+
+import lombok.Data;
+
+@Data
+public class VirtualTableRes extends BaseInfo {
+
+ private String tableId;
+
+ private String datasourceId;
+
+ private String datasourceName;
+
+ private String databaseName;
+
+ private String tableName;
+
+ private String description;
+
+ private String pluginName;
+
+ private int createUserId;
+
+ private int updateUserId;
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/AbstractDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/AbstractDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..576b55a8
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/AbstractDataSourceConfigSwitcher.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.exceptions.UnSupportWrapperException;
+import org.apache.seatunnel.app.thridparty.framework.SeaTunnelOptionRuleWrapper;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class AbstractDataSourceConfigSwitcher implements DataSourceConfigSwitcher {
+ @Override
+ public FormStructure filterOptionRule(
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ OptionRule connectorOptionRule,
+ List<String> excludedKeys) {
+
+ List<String> dataSourceRequiredAllKey =
+ Stream.concat(
+ dataSourceOptionRule.getRequiredOptions().stream(),
+ virtualTableOptionRule.getRequiredOptions().stream())
+ .flatMap(ro -> ro.getOptions().stream().map(Option::key))
+ .collect(Collectors.toList());
+
+ dataSourceRequiredAllKey.addAll(excludedKeys);
+
+ List<RequiredOption> requiredOptions =
+ connectorOptionRule.getRequiredOptions().stream()
+ .map(
+ requiredOption -> {
+ if (requiredOption instanceof RequiredOption.AbsolutelyRequiredOptions) {
+ RequiredOption.AbsolutelyRequiredOptions
+ absolutelyRequiredOptions =
+ (RequiredOption.AbsolutelyRequiredOptions)
+ requiredOption;
+ List<Option<?>> requiredOpList =
+ absolutelyRequiredOptions.getOptions().stream()
+ .filter(
+ op -> {
+ return !dataSourceRequiredAllKey
+ .contains(op.key());
+ })
+ .collect(Collectors.toList());
+ return requiredOpList.isEmpty() ? null
+ : OptionRule.builder()
+ .required(
+ requiredOpList.toArray(
+ new Option<?>[0]))
+ .build()
+ .getRequiredOptions()
+ .get(0);
+ }
+
+ if (requiredOption instanceof RequiredOption.BundledRequiredOptions) {
+ List<Option<?>> bundledRequiredOptions =
+ requiredOption.getOptions();
+ return bundledRequiredOptions.stream()
+ .anyMatch(
+ op ->
+ dataSourceRequiredAllKey
+ .contains(op.key())) ? null
+ : requiredOption;
+ }
+
+ if (requiredOption instanceof RequiredOption.ExclusiveRequiredOptions) {
+ List<Option<?>> exclusiveOptions =
+ requiredOption.getOptions();
+ return exclusiveOptions.stream()
+ .anyMatch(
+ op ->
+ dataSourceRequiredAllKey
+ .contains(op.key())) ? null
+ : requiredOption;
+ }
+
+ if (requiredOption instanceof RequiredOption.ConditionalRequiredOptions) {
+ List<Option<?>> conditionalRequiredOptions =
+ requiredOption.getOptions();
+ return conditionalRequiredOptions.stream()
+ .anyMatch(
+ op ->
+ dataSourceRequiredAllKey
+ .contains(op.key())) ? null
+ : requiredOption;
+ }
+
+ throw new UnSupportWrapperException(
+ connectorName, "Unknown", requiredOption.toString());
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ List<String> dataSourceOptionAllKey =
+ Stream.concat(
+ dataSourceOptionRule.getOptionalOptions().stream(),
+ virtualTableOptionRule.getOptionalOptions().stream())
+ .map(Option::key)
+ .collect(Collectors.toList());
+
+ dataSourceOptionAllKey.addAll(excludedKeys);
+
+ List<Option<?>> optionList =
+ connectorOptionRule.getOptionalOptions().stream()
+ .filter(option -> !dataSourceOptionAllKey.contains(option.key()))
+ .collect(Collectors.toList());
+
+ return SeaTunnelOptionRuleWrapper.wrapper(
+ optionList, requiredOptions, connectorName, pluginType);
+ }
+
+ @Override
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+
+ Config mergedConfig = connectorConfig;
+
+ for (Map.Entry<String, ConfigValue> en : dataSourceInstanceConfig.entrySet()) {
+ mergedConfig = mergedConfig.withValue(en.getKey(), en.getValue());
+ }
+
+ return mergedConfig;
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceClientFactory.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceClientFactory.java
new file mode 100644
index 00000000..4647e6eb
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceClientFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource;
+
+import org.apache.seatunnel.datasource.DataSourceClient;
+
+public class DataSourceClientFactory {
+ private static DataSourceClient INSTANCE;
+
+ private static final Object LOCK = new Object();
+
+ public static DataSourceClient getDataSourceClient() {
+ if (null != INSTANCE) {
+ return INSTANCE;
+ }
+ synchronized (LOCK) {
+ if (null != INSTANCE) {
+ return INSTANCE;
+ }
+ INSTANCE = new DataSourceClient();
+ return INSTANCE;
+ }
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceConfigSwitcher.java
new file mode 100644
index 00000000..d7a4de8f
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceConfigSwitcher.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+public interface DataSourceConfigSwitcher {
+
+ /**
+ * Use the OptionRule of the data source to filter the OptionRule of the connector
+ */
+ FormStructure filterOptionRule(
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ OptionRule connectorOptionRule,
+ List<String> excludedKeys);
+
+ /**
+ * Merge the parameters of the data source instance and connector configuration into the final connector parameters
+ */
+ Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig);
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceConfigSwitcherUtils.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceConfigSwitcherUtils.java
new file mode 100644
index 00000000..2e5b3071
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/DataSourceConfigSwitcherUtils.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.impl.ClickhouseDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.ElasticSearchDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.KafkaDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.MysqlCDCDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.MysqlDatasourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.OracleDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.PostgresCDCDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.PostgresqlDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.RedshiftDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.S3DataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.S3RedshiftDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.SqlServerCDCDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.SqlServerDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.StarRocksDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.impl.TidbDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.ArrayList;
+
+public class DataSourceConfigSwitcherUtils {
+
+ public static FormStructure filterOptionRule(
+ String datasourceName,
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ PluginType pluginType,
+ BusinessMode businessMode,
+ OptionRule connectorOptionRule) {
+ DataSourceConfigSwitcher dataSourceConfigSwitcher =
+ getDataSourceConfigSwitcher(datasourceName.toUpperCase());
+ return dataSourceConfigSwitcher.filterOptionRule(
+ connectorName,
+ dataSourceOptionRule,
+ virtualTableOptionRule,
+ businessMode,
+ pluginType,
+ connectorOptionRule,
+ new ArrayList<>());
+ }
+
+ public static Config mergeDatasourceConfig(
+ String datasourceName,
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+ DataSourceConfigSwitcher dataSourceConfigSwitcher =
+ getDataSourceConfigSwitcher(datasourceName.toUpperCase());
+ return dataSourceConfigSwitcher.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ }
+
+ private static DataSourceConfigSwitcher getDataSourceConfigSwitcher(String datasourceName) {
+ checkNotNull(datasourceName, "datasourceName cannot be null");
+ // Use SPI
+ switch (datasourceName.toUpperCase()) {
+ case "JDBC-MYSQL":
+ return MysqlDatasourceConfigSwitcher.INSTANCE;
+ case "ELASTICSEARCH":
+ return ElasticSearchDataSourceConfigSwitcher.INSTANCE;
+ case "KAFKA":
+ return KafkaDataSourceConfigSwitcher.getInstance();
+ case "STARROCKS":
+ return StarRocksDataSourceConfigSwitcher.INSTANCE;
+ case "MYSQL-CDC":
+ return MysqlCDCDataSourceConfigSwitcher.INSTANCE;
+ case "S3-REDSHIFT":
+ return S3RedshiftDataSourceConfigSwitcher.getInstance();
+ case "JDBC-CLICKHOUSE":
+ return ClickhouseDataSourceConfigSwitcher.getInstance();
+ case "JDBC-POSTGRES":
+ return PostgresqlDataSourceConfigSwitcher.getInstance();
+ case "JDBC-REDSHIFT":
+ return RedshiftDataSourceConfigSwitcher.getInstance();
+ case "JDBC-SQLSERVER":
+ return SqlServerDataSourceConfigSwitcher.getInstance();
+ case "JDBC-TIDB":
+ return TidbDataSourceConfigSwitcher.INSTANCE;
+ case "JDBC-ORACLE":
+ return OracleDataSourceConfigSwitcher.INSTANCE;
+ case "S3":
+ return S3DataSourceConfigSwitcher.getInstance();
+ case "SQLSERVER-CDC":
+ return SqlServerCDCDataSourceConfigSwitcher.INSTANCE;
+ case "POSTGRES-CDC":
+ return PostgresCDCDataSourceConfigSwitcher.INSTANCE;
+
+ default:
+ throw new SeaTunnelException(
+ "data source : "
+ + datasourceName
+ + " is no implementation class for DataSourceConfigSwitcher");
+ }
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/SchemaGenerator.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/SchemaGenerator.java
new file mode 100644
index 00000000..730ee658
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/SchemaGenerator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableFieldRes;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class SchemaGenerator {
+
+ private SchemaGenerator() {
+ }
+
+ /**
+ * Generate the schema of the table.
+ *
+ * <pre>
+ * fields {
+ * name = "string"
+ * age = "int"
+ * }
+ * </pre>
+ *
+ * @param virtualTableDetailRes virtual table detail.
+ * @param selectTableFields select table fields which need to be placed in the schema.
+ * @return schema.
+ */
+ public static Config generateSchemaBySelectTableFields(
+ VirtualTableDetailRes virtualTableDetailRes, SelectTableFields selectTableFields) {
+ checkNotNull(selectTableFields, "selectTableFields cannot be null");
+ checkArgument(
+ CollectionUtils.isNotEmpty(selectTableFields.getTableFields()),
+ "selectTableFields.tableFields cannot be empty");
+
+ checkNotNull(virtualTableDetailRes, "virtualTableDetailRes cannot be null");
+ checkArgument(
+ CollectionUtils.isNotEmpty(virtualTableDetailRes.getFields()),
+ "virtualTableDetailRes.fields cannot be empty");
+
+ Map<String, VirtualTableFieldRes> fieldTypeMap =
+ virtualTableDetailRes.getFields().stream()
+ .collect(
+ Collectors.toMap(
+ VirtualTableFieldRes::getFieldName, Function.identity()));
+
+ Config schema = ConfigFactory.empty();
+ for (String fieldName : selectTableFields.getTableFields()) {
+ VirtualTableFieldRes virtualTableFieldRes =
+ checkNotNull(
+ fieldTypeMap.get(fieldName),
+ String.format(
+ "Cannot find the field: %s from virtual table", fieldName));
+ schema =
+ schema.withValue(
+ fieldName,
+ ConfigValueFactory.fromAnyRef(virtualTableFieldRes.getFieldType()));
+ }
+ return schema.atKey("fields");
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/BaseJdbcDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/BaseJdbcDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..f8bba3dc
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/BaseJdbcDataSourceConfigSwitcher.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class BaseJdbcDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+ private static final String TABLE_KEY = "table";
+ private static final String DATABASE_KEY = "database";
+
+ private static final String QUERY_KEY = "query";
+
+ private static final String GENERATE_SINK_SQL = "generate_sink_sql";
+
+ private static final String URL_KEY = "url";
+
+ @Override
+ public FormStructure filterOptionRule(
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ OptionRule connectorOptionRule,
+ List<String> excludedKeys) {
+ Map<PluginType, List<String>> filterFieldMap = new HashMap<>();
+
+ filterFieldMap.put(
+ PluginType.SINK,
+ Arrays.asList(QUERY_KEY, TABLE_KEY, DATABASE_KEY, GENERATE_SINK_SQL));
+ filterFieldMap.put(PluginType.SOURCE, Collections.singletonList(QUERY_KEY));
+
+ return super.filterOptionRule(
+ connectorName,
+ dataSourceOptionRule,
+ virtualTableOptionRule,
+ businessMode,
+ pluginType,
+ connectorOptionRule,
+ filterFieldMap.get(pluginType));
+ }
+
+ @Override
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+
+ // replace database in url
+ if (dataSourceOption.getDatabases().size() == 1) {
+ String databaseName = dataSourceOption.getDatabases().get(0);
+ String url = dataSourceInstanceConfig.getString(URL_KEY);
+ String newUrl = replaceDatabaseNameInUrl(url, databaseName);
+ dataSourceInstanceConfig =
+ dataSourceInstanceConfig.withValue(
+ URL_KEY, ConfigValueFactory.fromAnyRef(newUrl));
+ }
+ if (pluginType.equals(PluginType.SINK)) {
+ connectorConfig =
+ connectorConfig.withValue(
+ GENERATE_SINK_SQL, ConfigValueFactory.fromAnyRef(false));
+ }
+ if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+
+ String databaseName = dataSourceOption.getDatabases().get(0);
+
+ String tableName = dataSourceOption.getTables().get(0);
+
+ // create sql from schema
+ if (pluginType.equals(PluginType.SOURCE)) {
+
+ List<String> tableFields = selectTableFields.getTableFields();
+
+ String sql = tableFieldsToSql(tableFields, databaseName, tableName);
+
+ connectorConfig =
+ connectorConfig.withValue(QUERY_KEY, ConfigValueFactory.fromAnyRef(sql));
+ } else if (pluginType.equals(PluginType.SINK)) {
+ connectorConfig =
+ connectorConfig.withValue(
+ DATABASE_KEY, ConfigValueFactory.fromAnyRef(databaseName));
+ connectorConfig =
+ connectorConfig.withValue(
+ TABLE_KEY, ConfigValueFactory.fromAnyRef(tableName));
+ } else {
+ throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+ }
+
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ } else if (businessMode.equals(BusinessMode.DATA_REPLICA)) {
+ String databaseName = dataSourceOption.getDatabases().get(0);
+ if (pluginType.equals(PluginType.SINK)) {
+ connectorConfig =
+ connectorConfig.withValue(
+ DATABASE_KEY, ConfigValueFactory.fromAnyRef(databaseName));
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ } else {
+ throw new UnsupportedOperationException(
+ "JDBC DATA_REPLICA Unsupported plugin type: " + pluginType);
+ }
+
+ } else {
+ throw new UnsupportedOperationException("Unsupported businessMode : " + businessMode);
+ }
+ }
+
+ protected String generateSql(
+ List<String> tableFields, String database, String schema, String table) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ");
+ for (int i = 0; i < tableFields.size(); i++) {
+ sb.append(quoteIdentifier(tableFields.get(i)));
+ if (i < tableFields.size() - 1) {
+ sb.append(", ");
+ }
+ }
+ sb.append(" FROM ").append(quoteIdentifier(database));
+ if (schema != null && !schema.isEmpty()) {
+ sb.append(".").append(quoteIdentifier(schema));
+ }
+ sb.append(".").append(quoteIdentifier(table));
+
+ return sb.toString();
+ }
+
+ protected String tableFieldsToSql(List<String> tableFields, String database, String table) {
+ return generateSql(tableFields, database, null, table);
+ }
+
+ protected String quoteIdentifier(String identifier) {
+ return "`" + identifier + "`";
+ }
+
+ protected String replaceDatabaseNameInUrl(String url, String databaseName) {
+ return url;
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/ClickhouseDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/ClickhouseDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..72fb36b7
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/ClickhouseDataSourceConfigSwitcher.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.utils.JdbcUtils;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Map;
+
+public class ClickhouseDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+ private static final ClickhouseDataSourceConfigSwitcher INSTANCE =
+ new ClickhouseDataSourceConfigSwitcher();
+
+ private static final String HOST = "host";
+ private static final String URL = "url";
+ private static final String SQL = "sql";
+ private static final String DATABASE = "database";
+ private static final String TABLE = "table";
+
+ private static final Map<PluginType, List<String>> FILTER_FIELD_MAP =
+ new ImmutableMap.Builder<PluginType, List<String>>()
+ .put(PluginType.SOURCE, Lists.newArrayList(SQL, HOST))
+ .put(PluginType.SINK, Lists.newArrayList(HOST, DATABASE, TABLE))
+ .build();
+
+ @Override
+ public FormStructure filterOptionRule(
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ OptionRule connectorOptionRule,
+ List<String> excludedKeys) {
+ return super.filterOptionRule(
+ connectorName,
+ dataSourceOptionRule,
+ virtualTableOptionRule,
+ businessMode,
+ pluginType,
+ connectorOptionRule,
+ FILTER_FIELD_MAP.get(pluginType));
+ }
+
+ @Override
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+ switch (businessMode) {
+ case DATA_REPLICA:
+ // We only support sink in data replica mode
+ if (pluginType.equals(PluginType.SINK)) {
+ connectorConfig =
+ connectorConfig.withValue(
+ DATABASE,
+ ConfigValueFactory.fromAnyRef(
+ dataSourceOption.getDatabases().get(0)));
+ } else {
+ throw new UnsupportedOperationException(
+ "Clickhouse DATA_REPLICA Unsupported plugin type: " + pluginType);
+ }
+ break;
+ case DATA_INTEGRATION:
+ // generate the sql by the schema
+ if (pluginType.equals(PluginType.SOURCE)) {
+ List<String> tableFields = selectTableFields.getTableFields();
+ String sql =
+ String.format(
+ "SELECT %s FROM %s",
+ String.join(",", tableFields),
+ String.format(
+ "`%s`.`%s`",
+ dataSourceOption.getDatabases().get(0),
+ dataSourceOption.getTables().get(0)));
+ connectorConfig =
+ connectorConfig.withValue(SQL, ConfigValueFactory.fromAnyRef(sql));
+ } else if (pluginType.equals(PluginType.SINK)) {
+ connectorConfig =
+ connectorConfig.withValue(
+ DATABASE,
+ ConfigValueFactory.fromAnyRef(
+ dataSourceOption.getDatabases().get(0)));
+ connectorConfig =
+ connectorConfig.withValue(
+ TABLE,
+ ConfigValueFactory.fromAnyRef(
+ dataSourceOption.getTables().get(0)));
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported plugin type: " + pluginType);
+ }
+ break;
+ default:
+ break;
+ }
+ connectorConfig =
+ connectorConfig.withValue(
+ HOST,
+ ConfigValueFactory.fromAnyRef(
+ JdbcUtils.getAddressFromUrl(
+ dataSourceInstanceConfig.getString(URL))));
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ }
+
+ public static ClickhouseDataSourceConfigSwitcher getInstance() {
+ return INSTANCE;
+ }
+
+ private ClickhouseDataSourceConfigSwitcher() {
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/ElasticSearchDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/ElasticSearchDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..a378fae7
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/ElasticSearchDataSourceConfigSwitcher.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ElasticSearchDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+ public static final ElasticSearchDataSourceConfigSwitcher INSTANCE =
+ new ElasticSearchDataSourceConfigSwitcher();
+
+ private static final String SOURCE = "source";
+ private static final String SCHEMA = "schema";
+ private static final String PRIMARY_KEYS = "primary_keys";
+ private static final String INDEX = "index";
+
+ private ElasticSearchDataSourceConfigSwitcher() {
+ }
+
+ @Override
+ public FormStructure filterOptionRule(
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ OptionRule connectorOptionRule,
+ List<String> excludedKeys) {
+ if (PluginType.SOURCE.equals(pluginType)) {
+ // DELETE source/schema
+ excludedKeys.addAll(Arrays.asList(SOURCE, SCHEMA, INDEX));
+ } else if (PluginType.SINK.equals(pluginType)) {
+ excludedKeys.add(INDEX);
+ // DELETE primary_keys
+ if (businessMode.equals(BusinessMode.DATA_REPLICA)) {
+ excludedKeys.add(PRIMARY_KEYS);
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+ }
+ return super.filterOptionRule(
+ connectorName,
+ dataSourceOptionRule,
+ virtualTableOptionRule,
+ businessMode,
+ pluginType,
+ connectorOptionRule,
+ excludedKeys);
+ }
+
+ @Override
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+ if (PluginType.SOURCE.equals(pluginType)) {
+ if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+ // Add source
+ connectorConfig =
+ connectorConfig.withValue(
+ INDEX,
+ ConfigValueFactory.fromAnyRef(dataSourceOption.getTables().get(0)));
+ connectorConfig =
+ connectorConfig.withValue(
+ SOURCE,
+ ConfigValueFactory.fromIterable(
+ selectTableFields.getTableFields()));
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported business mode: " + businessMode);
+ }
+ } else if (PluginType.SINK.equals(pluginType)) {
+ // TODO Add primary_keys
+ if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+ // Add Index
+ connectorConfig =
+ connectorConfig.withValue(
+ INDEX,
+ ConfigValueFactory.fromAnyRef(dataSourceOption.getTables().get(0)));
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+ }
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/KafkaDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/KafkaDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..3e7b5fdd
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/KafkaDataSourceConfigSwitcher.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.SchemaGenerator;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.List;
+
+public class KafkaDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+ private static final KafkaDataSourceConfigSwitcher INSTANCE =
+ new KafkaDataSourceConfigSwitcher();
+
+ private static final String SCHEMA = "schema";
+ private static final String TOPIC = "topic";
+ private static final String TABLE = "table";
+ private static final String FORMAT = "format";
+
+ private static final String DEBEZIUM_FORMAT = "COMPATIBLE_DEBEZIUM_JSON";
+
+ @Override
+ public FormStructure filterOptionRule(
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ OptionRule connectorOptionRule,
+ List<String> excludedKeys) {
+ if (pluginType == PluginType.SOURCE) {
+ excludedKeys.add(SCHEMA);
+ excludedKeys.add(TOPIC);
+ }
+ if (pluginType == PluginType.SINK && businessMode.equals(BusinessMode.DATA_REPLICA)) {
+ excludedKeys.add(FORMAT);
+ }
+ return super.filterOptionRule(
+ connectorName,
+ dataSourceOptionRule,
+ virtualTableOptionRule,
+ businessMode,
+ pluginType,
+ connectorOptionRule,
+ excludedKeys);
+ }
+
+ @Override
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+ if (pluginType == PluginType.SOURCE) {
+ // Use field to generate the schema
+ connectorConfig =
+ connectorConfig.withValue(
+ TOPIC,
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail.getDatasourceProperties().get(TOPIC)));
+ connectorConfig =
+ connectorConfig.withValue(
+ SCHEMA,
+ SchemaGenerator.generateSchemaBySelectTableFields(
+ virtualTableDetail, selectTableFields)
+ .root());
+ } else if (pluginType == PluginType.SINK) {
+ if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+ // Set the table name to topic
+ connectorConfig =
+ connectorConfig.withValue(
+ TOPIC,
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail.getDatasourceProperties().get(TOPIC)));
+ }
+ if (businessMode.equals(BusinessMode.DATA_REPLICA)) {
+ connectorConfig =
+ connectorConfig.withValue(
+ FORMAT, ConfigValueFactory.fromAnyRef(DEBEZIUM_FORMAT));
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+ }
+
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ }
+
+ private KafkaDataSourceConfigSwitcher() {
+ }
+
+ public static KafkaDataSourceConfigSwitcher getInstance() {
+ return INSTANCE;
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/MysqlCDCDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/MysqlCDCDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..2d99e9bd
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/MysqlCDCDataSourceConfigSwitcher.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableFieldRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+public class MysqlCDCDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+ private MysqlCDCDataSourceConfigSwitcher() {
+ }
+
+ public static final MysqlCDCDataSourceConfigSwitcher INSTANCE =
+ new MysqlCDCDataSourceConfigSwitcher();
+
+ private static final String FACTORY = "factory";
+
+ private static final String CATALOG = "catalog";
+
+ private static final String TABLE_NAMES = "table-names";
+
+ private static final String DATABASE_NAMES = "database-names";
+
+ private static final String FORMAT_KEY = "format";
+
+ private static final String DEBEZIUM_FORMAT = "COMPATIBLE_DEBEZIUM_JSON";
+
+ private static final String DEFAULT_FORMAT = "DEFAULT";
+
+ private static final String SCHEMA = "schema";
+
+ @Override
+ public FormStructure filterOptionRule(
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ OptionRule connectorOptionRule,
+ List<String> excludedKeys) {
+ if (PluginType.SOURCE.equals(pluginType)) {
+ excludedKeys.add(DATABASE_NAMES);
+ excludedKeys.add(TABLE_NAMES);
+ if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+ excludedKeys.add(FORMAT_KEY);
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+ }
+ return super.filterOptionRule(
+ connectorName,
+ dataSourceOptionRule,
+ virtualTableOptionRule,
+ businessMode,
+ pluginType,
+ connectorOptionRule,
+ excludedKeys);
+ }
+
+ @Override
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+ if (PluginType.SOURCE.equals(pluginType)) {
+ // Add table-names
+ Config config = ConfigFactory.empty();
+ config = config.withValue(FACTORY, ConfigValueFactory.fromAnyRef("Mysql"));
+ connectorConfig = connectorConfig.withValue(CATALOG, config.root());
+ connectorConfig =
+ connectorConfig.withValue(
+ DATABASE_NAMES,
+ ConfigValueFactory.fromIterable(dataSourceOption.getDatabases()));
+ connectorConfig =
+ connectorConfig.withValue(
+ TABLE_NAMES,
+ ConfigValueFactory.fromIterable(
+ mergeDatabaseAndTables(dataSourceOption)));
+
+ if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+ connectorConfig =
+ connectorConfig.withValue(
+ FORMAT_KEY, ConfigValueFactory.fromAnyRef(DEFAULT_FORMAT));
+ } else if (businessMode.equals(BusinessMode.DATA_REPLICA)
+ && connectorConfig
+ .getString(FORMAT_KEY)
+ .toUpperCase(Locale.ROOT)
+ .equals(DEBEZIUM_FORMAT)) {
+ connectorConfig =
+ connectorConfig.withValue(SCHEMA, generateDebeziumFormatSchema().root());
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+ }
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ }
+
+ private Config generateDebeziumFormatSchema() {
+ List<VirtualTableFieldRes> fieldResList = new ArrayList<>();
+ fieldResList.add(new VirtualTableFieldRes("topic", "string", false, null, false, "", ""));
+ fieldResList.add(new VirtualTableFieldRes("key", "string", false, null, false, "", ""));
+ fieldResList.add(new VirtualTableFieldRes("value", "string", false, null, false, "", ""));
+
+ Config schema = ConfigFactory.empty();
+ for (VirtualTableFieldRes virtualTableFieldRes : fieldResList) {
+ schema =
+ schema.withValue(
+ virtualTableFieldRes.getFieldName(),
+ ConfigValueFactory.fromAnyRef(virtualTableFieldRes.getFieldType()));
+ }
+ return schema.atKey("fields");
+ }
+
+ private List<String> mergeDatabaseAndTables(DataSourceOption dataSourceOption) {
+ List<String> tables = new ArrayList<>();
+ dataSourceOption
+ .getDatabases()
+ .forEach(
+ database -> {
+ dataSourceOption
+ .getTables()
+ .forEach(
+ table -> {
+ if (table.contains(".")) {
+ tables.add(table);
+ } else {
+ tables.add(
+ getDatabaseAndTable(database, table));
+ }
+ });
+ });
+ return tables;
+ }
+
+ private String getDatabaseAndTable(String database, String table) {
+ return String.format("%s.%s", database, table);
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/MysqlDatasourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/MysqlDatasourceConfigSwitcher.java
new file mode 100644
index 00000000..c27d323d
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/MysqlDatasourceConfigSwitcher.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+public class MysqlDatasourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher {
+ public static MysqlDatasourceConfigSwitcher INSTANCE = new MysqlDatasourceConfigSwitcher();
+
+ private MysqlDatasourceConfigSwitcher() {
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/OracleDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/OracleDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..43d85c84
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/OracleDataSourceConfigSwitcher.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class OracleDataSourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher {
+ public static final OracleDataSourceConfigSwitcher INSTANCE =
+ new OracleDataSourceConfigSwitcher();
+
+ protected String tableFieldsToSql(List<String> tableFields, String database, String fullTable) {
+ String[] split = fullTable.split("\\.");
+ if (split.length != 2) {
+ throw new SeaTunnelException(
+ "The tableName for oracle must be schemaName.tableName, but tableName is "
+ + fullTable);
+ }
+
+ String schemaName = split[0];
+ String tableName = split[1];
+
+ return generateSql(tableFields, database, schemaName, tableName);
+ }
+
+ protected String quoteIdentifier(String identifier) {
+ return "\"" + identifier + "\"";
+ }
+
+ @Override
+ protected String generateSql(
+ List<String> tableFields, String database, String schema, String table) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ");
+ for (int i = 0; i < tableFields.size(); i++) {
+ sb.append(quoteIdentifier(tableFields.get(i)));
+ if (i < tableFields.size() - 1) {
+ sb.append(", ");
+ }
+ }
+ sb.append(" FROM ")
+ .append(quoteIdentifier(schema))
+ .append(".")
+ .append(quoteIdentifier(table));
+ return sb.toString();
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/PostgresCDCDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/PostgresCDCDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..1b282dc0
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/PostgresCDCDataSourceConfigSwitcher.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableFieldRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+public class PostgresCDCDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+ private PostgresCDCDataSourceConfigSwitcher() {
+ }
+
+ public static final PostgresCDCDataSourceConfigSwitcher INSTANCE =
+ new PostgresCDCDataSourceConfigSwitcher();
+
+ private static final String FACTORY = "factory";
+
+ private static final String CATALOG = "catalog";
+
+ private static final String TABLE_NAMES = "table-names";
+
+ private static final String DATABASE_NAMES = "database-names";
+
+ private static final String FORMAT_KEY = "format";
+
+ private static final String DEBEZIUM_FORMAT = "COMPATIBLE_DEBEZIUM_JSON";
+
+ private static final String DEFAULT_FORMAT = "DEFAULT";
+
+ private static final String SCHEMA = "schema";
+
+ @Override
+ public FormStructure filterOptionRule(
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ OptionRule connectorOptionRule,
+ List<String> excludedKeys) {
+ if (PluginType.SOURCE.equals(pluginType)) {
+ excludedKeys.add(DATABASE_NAMES);
+ excludedKeys.add(TABLE_NAMES);
+ if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+ excludedKeys.add(FORMAT_KEY);
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+ }
+ return super.filterOptionRule(
+ connectorName,
+ dataSourceOptionRule,
+ virtualTableOptionRule,
+ businessMode,
+ pluginType,
+ connectorOptionRule,
+ excludedKeys);
+ }
+
+ @Override
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+ if (PluginType.SOURCE.equals(pluginType)) {
+ // Add table-names
+ Config config = ConfigFactory.empty();
+ config = config.withValue(FACTORY, ConfigValueFactory.fromAnyRef("Postgres"));
+ connectorConfig = connectorConfig.withValue(CATALOG, config.root());
+ connectorConfig =
+ connectorConfig.withValue(
+ DATABASE_NAMES,
+ ConfigValueFactory.fromIterable(dataSourceOption.getDatabases()));
+ connectorConfig =
+ connectorConfig.withValue(
+ TABLE_NAMES,
+ ConfigValueFactory.fromIterable(
+ mergeDatabaseAndTables(dataSourceOption)));
+
+ if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+ connectorConfig =
+ connectorConfig.withValue(
+ FORMAT_KEY, ConfigValueFactory.fromAnyRef(DEFAULT_FORMAT));
+ } else if (businessMode.equals(BusinessMode.DATA_REPLICA)
+ && connectorConfig
+ .getString(FORMAT_KEY)
+ .toUpperCase(Locale.ROOT)
+ .equals(DEBEZIUM_FORMAT)) {
+ connectorConfig =
+ connectorConfig.withValue(SCHEMA, generateDebeziumFormatSchema().root());
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+ }
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ }
+
+ private Config generateDebeziumFormatSchema() {
+ List<VirtualTableFieldRes> fieldResList = new ArrayList<>();
+ fieldResList.add(new VirtualTableFieldRes("topic", "string", false, null, false, "", ""));
+ fieldResList.add(new VirtualTableFieldRes("key", "string", false, null, false, "", ""));
+ fieldResList.add(new VirtualTableFieldRes("value", "string", false, null, false, "", ""));
+
+ Config schema = ConfigFactory.empty();
+ for (VirtualTableFieldRes virtualTableFieldRes : fieldResList) {
+ schema =
+ schema.withValue(
+ virtualTableFieldRes.getFieldName(),
+ ConfigValueFactory.fromAnyRef(virtualTableFieldRes.getFieldType()));
+ }
+ return schema.atKey("fields");
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private List<String> mergeDatabaseAndTables(DataSourceOption dataSourceOption) {
+ List<String> tables = new ArrayList<>();
+ dataSourceOption
+ .getDatabases()
+ .forEach(
+ database -> {
+ dataSourceOption
+ .getTables()
+ .forEach(
+ table -> {
+ final String[] tableFragments = table.split("\\.");
+ if (tableFragments.length == 3) {
+ tables.add(table);
+ } else if (tableFragments.length == 2) {
+ tables.add(
+ getDatabaseAndTable(database, table));
+ } else {
+ throw new IllegalArgumentException(
+ "Illegal postgres table-name: "
+ + table);
+ }
+ });
+ });
+ return tables;
+ }
+
+ private String getDatabaseAndTable(String database, String table) {
+ return String.format("%s.%s", database, table);
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/PostgresqlDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/PostgresqlDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..90a983d9
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/PostgresqlDataSourceConfigSwitcher.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.app.thridparty.datasource.DataSourceConfigSwitcher;
+import org.apache.seatunnel.app.utils.JdbcUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import java.util.List;
+
+public class PostgresqlDataSourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher {
+
+ private static final PostgresqlDataSourceConfigSwitcher INSTANCE =
+ new PostgresqlDataSourceConfigSwitcher();
+
+ private PostgresqlDataSourceConfigSwitcher() {
+ }
+
+ protected String tableFieldsToSql(List<String> tableFields, String database, String fullTable) {
+
+ String[] split = fullTable.split("\\.");
+ if (split.length != 2) {
+ throw new SeaTunnelException(
+ "The tableName for postgres must be schemaName.tableName, but tableName is "
+ + fullTable);
+ }
+
+ String schemaName = split[0];
+ String tableName = split[1];
+
+ return generateSql(tableFields, database, schemaName, tableName);
+ }
+
+ protected String quoteIdentifier(String identifier) {
+ return "\"" + identifier + "\"";
+ }
+
+ protected String replaceDatabaseNameInUrl(String url, String databaseName) {
+ return JdbcUtils.replaceDatabase(url, databaseName);
+ }
+
+ public static DataSourceConfigSwitcher getInstance() {
+ return INSTANCE;
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/RedshiftDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/RedshiftDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..98da884c
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/RedshiftDataSourceConfigSwitcher.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.app.utils.JdbcUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import java.util.List;
+
+public class RedshiftDataSourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher {
+ private static final RedshiftDataSourceConfigSwitcher INSTANCE =
+ new RedshiftDataSourceConfigSwitcher();
+
+ public static final RedshiftDataSourceConfigSwitcher getInstance() {
+ return INSTANCE;
+ }
+
+ protected String tableFieldsToSql(List<String> tableFields, String database, String fullTable) {
+
+ String[] split = fullTable.split("\\.");
+ if (split.length != 2) {
+ throw new SeaTunnelException(
+ "The tableName for postgres must be schemaName.tableName, but tableName is "
+ + fullTable);
+ }
+
+ String schemaName = split[0];
+ String tableName = split[1];
+
+ return generateSql(tableFields, database, schemaName, tableName);
+ }
+
+ protected String quoteIdentifier(String identifier) {
+ return "\"" + identifier + "\"";
+ }
+
+ protected String replaceDatabaseNameInUrl(String url, String databaseName) {
+ return JdbcUtils.replaceDatabase(url, databaseName);
+ }
+
+ private RedshiftDataSourceConfigSwitcher() {
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/S3DataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/S3DataSourceConfigSwitcher.java
new file mode 100644
index 00000000..d0c30cf0
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/S3DataSourceConfigSwitcher.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.app.thridparty.datasource.SchemaGenerator;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.datasource.plugin.s3.S3OptionRule;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+@Slf4j
+public class S3DataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+ private S3DataSourceConfigSwitcher() {
+ }
+
+ private static final S3DataSourceConfigSwitcher INSTANCE = new S3DataSourceConfigSwitcher();
+
+ @Override
+ public FormStructure filterOptionRule(
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ OptionRule connectorOptionRule,
+ List<String> excludedKeys) {
+ excludedKeys.add(S3OptionRule.PATH.key());
+ if (PluginType.SOURCE.equals(pluginType)) {
+ excludedKeys.add(S3OptionRule.SCHEMA.key());
+ }
+
+ return super.filterOptionRule(
+ connectorName,
+ dataSourceOptionRule,
+ virtualTableOptionRule,
+ businessMode,
+ pluginType,
+ connectorOptionRule,
+ excludedKeys);
+ }
+
+ @Override
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+ if (PluginType.SOURCE.equals(pluginType)) {
+ connectorConfig =
+ connectorConfig
+ .withValue(
+ S3OptionRule.SCHEMA.key(),
+ SchemaGenerator.generateSchemaBySelectTableFields(
+ virtualTableDetail, selectTableFields)
+ .root())
+ .withValue(
+ S3OptionRule.PATH.key(),
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail
+ .getDatasourceProperties()
+ .get(S3OptionRule.PATH.key())))
+ .withValue(
+ S3OptionRule.TYPE.key(),
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail
+ .getDatasourceProperties()
+ .get(S3OptionRule.TYPE.key())))
+ .withValue(
+ S3OptionRule.PARSE_PARSE_PARTITION_FROM_PATH.key(),
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail
+ .getDatasourceProperties()
+ .get(
+ S3OptionRule
+ .PARSE_PARSE_PARTITION_FROM_PATH
+ .key())))
+ .withValue(
+ S3OptionRule.DATE_FORMAT.key(),
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail
+ .getDatasourceProperties()
+ .get(S3OptionRule.DATE_FORMAT.key())))
+ .withValue(
+ S3OptionRule.DATETIME_FORMAT.key(),
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail
+ .getDatasourceProperties()
+ .get(S3OptionRule.DATETIME_FORMAT.key())))
+ .withValue(
+ S3OptionRule.TIME_FORMAT.key(),
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail
+ .getDatasourceProperties()
+ .get(S3OptionRule.TIME_FORMAT.key())));
+ } else if (PluginType.SINK.equals(pluginType)) {
+ if (virtualTableDetail.getDatasourceProperties().get(S3OptionRule.TIME_FORMAT.key()) == null) {
+ throw new IllegalArgumentException("S3 virtual table path is null");
+ }
+ connectorConfig =
+ connectorConfig
+ .withValue(
+ S3OptionRule.PATH.key(),
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail
+ .getDatasourceProperties()
+ .get(S3OptionRule.PATH.key())))
+ .withValue(
+ S3OptionRule.TYPE.key(),
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail
+ .getDatasourceProperties()
+ .get(S3OptionRule.TYPE.key())))
+ .withValue(
+ S3OptionRule.PARSE_PARSE_PARTITION_FROM_PATH.key(),
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail
+ .getDatasourceProperties()
+ .get(
+ S3OptionRule
+ .PARSE_PARSE_PARTITION_FROM_PATH
+ .key())))
+ .withValue(
+ S3OptionRule.DATE_FORMAT.key(),
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail
+ .getDatasourceProperties()
+ .get(S3OptionRule.DATE_FORMAT.key())))
+ .withValue(
+ S3OptionRule.DATETIME_FORMAT.key(),
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail
+ .getDatasourceProperties()
+ .get(S3OptionRule.DATETIME_FORMAT.key())))
+ .withValue(
+ S3OptionRule.TIME_FORMAT.key(),
+ ConfigValueFactory.fromAnyRef(
+ virtualTableDetail
+ .getDatasourceProperties()
+ .get(S3OptionRule.TIME_FORMAT.key())));
+ }
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ }
+
+ public static S3DataSourceConfigSwitcher getInstance() {
+ return INSTANCE;
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/S3RedshiftDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/S3RedshiftDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..dd7b208a
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/S3RedshiftDataSourceConfigSwitcher.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.List;
+
+public class S3RedshiftDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+ private S3RedshiftDataSourceConfigSwitcher() {
+ }
+
+ private static final S3RedshiftDataSourceConfigSwitcher INSTANCE =
+ new S3RedshiftDataSourceConfigSwitcher();
+
+ @Override
+ public FormStructure filterOptionRule(
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ OptionRule connectorOptionRule,
+ List<String> excludedKeys) {
+ excludedKeys.add("access_key");
+ excludedKeys.add("secret_key");
+ return super.filterOptionRule(
+ connectorName,
+ dataSourceOptionRule,
+ virtualTableOptionRule,
+ businessMode,
+ pluginType,
+ connectorOptionRule,
+ excludedKeys);
+ }
+
+ @Override
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ }
+
+ public static S3RedshiftDataSourceConfigSwitcher getInstance() {
+ return INSTANCE;
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/SqlServerCDCDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/SqlServerCDCDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..807d6e4c
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/SqlServerCDCDataSourceConfigSwitcher.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableFieldRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+public class SqlServerCDCDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+ private SqlServerCDCDataSourceConfigSwitcher() {
+ }
+
+ public static final SqlServerCDCDataSourceConfigSwitcher INSTANCE =
+ new SqlServerCDCDataSourceConfigSwitcher();
+
+ private static final String FACTORY = "factory";
+
+ private static final String CATALOG = "catalog";
+
+ private static final String TABLE_NAMES = "table-names";
+
+ private static final String DATABASE_NAMES = "database-names";
+
+ private static final String FORMAT_KEY = "format";
+
+ private static final String DEBEZIUM_FORMAT = "COMPATIBLE_DEBEZIUM_JSON";
+
+ private static final String DEFAULT_FORMAT = "DEFAULT";
+
+ private static final String SCHEMA = "schema";
+
+ @Override
+ public FormStructure filterOptionRule(
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ OptionRule connectorOptionRule,
+ List<String> excludedKeys) {
+ if (PluginType.SOURCE.equals(pluginType)) {
+ excludedKeys.add(DATABASE_NAMES);
+ excludedKeys.add(TABLE_NAMES);
+ if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+ excludedKeys.add(FORMAT_KEY);
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+ }
+ return super.filterOptionRule(
+ connectorName,
+ dataSourceOptionRule,
+ virtualTableOptionRule,
+ businessMode,
+ pluginType,
+ connectorOptionRule,
+ excludedKeys);
+ }
+
+ @Override
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+ if (PluginType.SOURCE.equals(pluginType)) {
+ // Add table-names
+ Config config = ConfigFactory.empty();
+ config = config.withValue(FACTORY, ConfigValueFactory.fromAnyRef("SqlServer"));
+ connectorConfig = connectorConfig.withValue(CATALOG, config.root());
+ connectorConfig =
+ connectorConfig.withValue(
+ DATABASE_NAMES,
+ ConfigValueFactory.fromIterable(dataSourceOption.getDatabases()));
+ connectorConfig =
+ connectorConfig.withValue(
+ TABLE_NAMES,
+ ConfigValueFactory.fromIterable(
+ mergeDatabaseAndTables(dataSourceOption)));
+
+ if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+ connectorConfig =
+ connectorConfig.withValue(
+ FORMAT_KEY, ConfigValueFactory.fromAnyRef(DEFAULT_FORMAT));
+ } else if (businessMode.equals(BusinessMode.DATA_REPLICA)
+ && connectorConfig
+ .getString(FORMAT_KEY)
+ .toUpperCase(Locale.ROOT)
+ .equals(DEBEZIUM_FORMAT)) {
+ connectorConfig =
+ connectorConfig.withValue(SCHEMA, generateDebeziumFormatSchema().root());
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+ }
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ }
+
+ private Config generateDebeziumFormatSchema() {
+ List<VirtualTableFieldRes> fieldResList = new ArrayList<>();
+ fieldResList.add(new VirtualTableFieldRes("topic", "string", false, null, false, "", ""));
+ fieldResList.add(new VirtualTableFieldRes("key", "string", false, null, false, "", ""));
+ fieldResList.add(new VirtualTableFieldRes("value", "string", false, null, false, "", ""));
+
+ Config schema = ConfigFactory.empty();
+ for (VirtualTableFieldRes virtualTableFieldRes : fieldResList) {
+ schema =
+ schema.withValue(
+ virtualTableFieldRes.getFieldName(),
+ ConfigValueFactory.fromAnyRef(virtualTableFieldRes.getFieldType()));
+ }
+ return schema.atKey("fields");
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private List<String> mergeDatabaseAndTables(DataSourceOption dataSourceOption) {
+ List<String> tables = new ArrayList<>();
+ dataSourceOption
+ .getDatabases()
+ .forEach(
+ database -> {
+ dataSourceOption
+ .getTables()
+ .forEach(
+ table -> {
+ final String[] tableFragments = table.split("\\.");
+ if (tableFragments.length == 3) {
+ tables.add(table);
+ } else if (tableFragments.length == 2) {
+ tables.add(
+ getDatabaseAndTable(database, table));
+ } else {
+ throw new IllegalArgumentException(
+ "Illegal sqlserver table-name: "
+ + table);
+ }
+ });
+ });
+ return tables;
+ }
+
+ private String getDatabaseAndTable(String database, String table) {
+ return String.format("%s.%s", database, table);
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/SqlServerDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/SqlServerDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..d5d492f7
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/SqlServerDataSourceConfigSwitcher.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+
+public class SqlServerDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+ private static final SqlServerDataSourceConfigSwitcher INSTANCE =
+ new SqlServerDataSourceConfigSwitcher();
+
+ public static final SqlServerDataSourceConfigSwitcher getInstance() {
+ return INSTANCE;
+ }
+
+ private SqlServerDataSourceConfigSwitcher() {
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/StarRocksDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/StarRocksDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..9cd11265
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/StarRocksDataSourceConfigSwitcher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.thridparty.datasource.AbstractDataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class StarRocksDataSourceConfigSwitcher extends AbstractDataSourceConfigSwitcher {
+
+ private static final String TABLE = "table";
+
+ private static final String DATABASE = "database";
+
+ private StarRocksDataSourceConfigSwitcher() {
+ }
+
+ public static final StarRocksDataSourceConfigSwitcher INSTANCE =
+ new StarRocksDataSourceConfigSwitcher();
+
+ @Override
+ public FormStructure filterOptionRule(
+ String connectorName,
+ OptionRule dataSourceOptionRule,
+ OptionRule virtualTableOptionRule,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ OptionRule connectorOptionRule,
+ List<String> excludedKeys) {
+ if (PluginType.SOURCE.equals(pluginType)) {
+ throw new UnsupportedOperationException("Unsupported PluginType: " + pluginType);
+ } else if (PluginType.SINK.equals(pluginType)) {
+ excludedKeys.addAll(Arrays.asList(TABLE, DATABASE));
+ } else {
+ throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+ }
+ return super.filterOptionRule(
+ connectorName,
+ dataSourceOptionRule,
+ virtualTableOptionRule,
+ businessMode,
+ pluginType,
+ connectorOptionRule,
+ excludedKeys);
+ }
+
+ @Override
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+ if (PluginType.SOURCE.equals(pluginType)) {
+ throw new UnsupportedOperationException("Unsupported PluginType: " + pluginType);
+ } else if (PluginType.SINK.equals(pluginType)) {
+ if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+ // Add Table
+ connectorConfig =
+ connectorConfig.withValue(
+ TABLE,
+ ConfigValueFactory.fromAnyRef(dataSourceOption.getTables().get(0)));
+ }
+ connectorConfig =
+ connectorConfig.withValue(
+ DATABASE,
+ ConfigValueFactory.fromAnyRef(dataSourceOption.getDatabases().get(0)));
+ } else {
+ throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+ }
+
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/TidbDataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/TidbDataSourceConfigSwitcher.java
new file mode 100644
index 00000000..17dfcef6
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/datasource/impl/TidbDataSourceConfigSwitcher.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.datasource.impl;
+
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+public class TidbDataSourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher {
+ public static final TidbDataSourceConfigSwitcher INSTANCE = new TidbDataSourceConfigSwitcher();
+
+ private static final String FACTORY = "factory";
+
+ private static final String CATALOG = "catalog";
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final String BASE_URL = "base-url";
+
+ private static final String TABLE_NAMES = "table-names";
+
+ private static final String DATABASE_NAMES = "database-names";
+
+ private TidbDataSourceConfigSwitcher() {
+ }
+
+ @Override
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+ // Add TiDB catalog options
+ Config config = ConfigFactory.empty();
+ config = config.withValue(FACTORY, ConfigValueFactory.fromAnyRef("TiDB"));
+ config =
+ config.withValue(
+ USERNAME,
+ ConfigValueFactory.fromAnyRef(dataSourceInstanceConfig.getString("user")));
+ config =
+ config.withValue(
+ PASSWORD,
+ ConfigValueFactory.fromAnyRef(
+ dataSourceInstanceConfig.getString(PASSWORD)));
+ config =
+ config.withValue(
+ BASE_URL,
+ ConfigValueFactory.fromAnyRef(dataSourceInstanceConfig.getString("url")));
+ connectorConfig = connectorConfig.withValue(CATALOG, config.root());
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/exceptions/UnSupportWrapperException.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/exceptions/UnSupportWrapperException.java
new file mode 100644
index 00000000..c761c9db
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/exceptions/UnSupportWrapperException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.exceptions;
+
+import lombok.NonNull;
+
+public class UnSupportWrapperException extends RuntimeException {
+ public UnSupportWrapperException(
+ @NonNull String formName, @NonNull String label, @NonNull String typeName) {
+ super(
+ String.format(
+ "Form: %s, label: %s, typeName: %s not yet supported",
+ formName, label, typeName));
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/FormOptionSort.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/FormOptionSort.java
new file mode 100644
index 00000000..63d69ddb
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/FormOptionSort.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.framework;
+
+import org.apache.seatunnel.app.dynamicforms.AbstractFormOption;
+import org.apache.seatunnel.app.dynamicforms.Constants;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FormOptionSort {
+ /**
+ * Sort form structure.
+ */
+ public static FormStructure sortFormStructure(@NonNull FormStructure formStructure) {
+ List<AbstractFormOption> newFormOptions = new ArrayList<>();
+ List<AbstractFormOption> formOptions = formStructure.getForms();
+ formOptions.forEach(currFormOption -> {
+ if (currFormOption.getShow() != null && currFormOption.getShow().size() > 0) {
+ return;
+ }
+ addShowOptionAfter(currFormOption, formOptions, newFormOptions);
+ });
+
+ return FormStructure.builder().name(formStructure.getName()).withLocale(formStructure.getLocales()).addFormOption(newFormOptions.toArray(new AbstractFormOption[0])).build();
+ }
+
+ public static void addShowOptionAfter(@NonNull AbstractFormOption currFormOption, @NonNull List<AbstractFormOption> allFormOptions, @NonNull List<AbstractFormOption> newFormOptions) {
+ if (newFormOptions.contains(currFormOption)) {
+ return;
+ }
+
+ newFormOptions.add(currFormOption);
+
+ List<AbstractFormOption> showOptions = allFormOptions.stream().filter(nextOption -> {
+ return nextOption.getShow() != null && nextOption.getShow().size() > 0 && nextOption.getShow().get(Constants.SHOW_FIELD).toString().equals(currFormOption.getField());
+ }).collect(Collectors.toList());
+
+ if (CollectionUtils.isEmpty(showOptions)) {
+ return;
+ }
+
+ for (AbstractFormOption showOption : showOptions) {
+ addShowOptionAfter(showOption, allFormOptions, newFormOptions);
+ }
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/PluginDiscoveryUtil.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/PluginDiscoveryUtil.java
new file mode 100644
index 00000000..9ef90143
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/PluginDiscoveryUtil.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.framework;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.app.domain.response.connector.ConnectorFeature;
+import org.apache.seatunnel.app.domain.response.connector.ConnectorInfo;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+
+import lombok.NonNull;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+public class PluginDiscoveryUtil {
+
+ public static List<ConnectorInfo> getAllConnectorsFromPluginMapping(PluginType pluginType) {
+ Map<PluginIdentifier, String> plugins = AbstractPluginDiscovery.getAllSupportedPlugins(pluginType);
+ List<ConnectorInfo> connectorInfos = new ArrayList<>();
+ plugins.forEach((plugin, artifactId) -> connectorInfos.add(new ConnectorInfo(plugin, artifactId)));
+ return connectorInfos;
+ }
+
+ public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(PluginType pluginType) throws IOException {
+ Common.setStarter(true);
+ if (!pluginType.equals(PluginType.SOURCE)) {
+ throw new UnsupportedOperationException("ONLY support plugin type source");
+ }
+ Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
+ List<Factory> factories;
+ if (path.toFile().exists()) {
+ List<URL> files = FileUtils.searchJarFiles(path);
+ factories = FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));
+ } else {
+ factories = FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
+ }
+ Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
+ factories.forEach(plugin -> {
+ if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
+ TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
+ PluginIdentifier info = PluginIdentifier.of("seatunnel", PluginType.SOURCE.getType(), plugin.factoryIdentifier());
+ featureMap.put(info, new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));
+ }
+ });
+ return featureMap;
+ }
+
+ public static List<ConnectorInfo> getDownloadedConnectors(@NonNull Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> allPlugins, @NonNull PluginType pluginType) throws IOException {
+ LinkedHashMap<PluginIdentifier, OptionRule> pluginIdentifierOptionRuleLinkedHashMap = allPlugins.get(pluginType);
+ if (pluginIdentifierOptionRuleLinkedHashMap == null) {
+ return new ArrayList<>();
+ }
+
+ List<ConnectorInfo> connectorInfos = new ArrayList<>();
+ pluginIdentifierOptionRuleLinkedHashMap.forEach((plugin, optionRule) -> connectorInfos.add(new ConnectorInfo(plugin, null)));
+ return connectorInfos;
+ }
+
+ public static List<ConnectorInfo> getTransforms(@NonNull Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> allPlugins) throws IOException {
+ LinkedHashMap<PluginIdentifier, OptionRule> pluginIdentifierOptionRuleLinkedHashMap = allPlugins.get(PluginType.TRANSFORM);
+ if (pluginIdentifierOptionRuleLinkedHashMap == null) {
+ return new ArrayList<>();
+ }
+ return pluginIdentifierOptionRuleLinkedHashMap.keySet().stream().map(t -> new ConnectorInfo(t, null)).collect(Collectors.toList());
+ }
+
+ public static Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> getAllConnectors() throws IOException {
+ return new SeaTunnelSinkPluginDiscovery().getAllPlugin();
+ }
+
+ public static ConcurrentMap<String, FormStructure> getDownloadedConnectorFormStructures(@NonNull Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> allPlugins, @NonNull PluginType pluginType) {
+ LinkedHashMap<PluginIdentifier, OptionRule> pluginIdentifierOptionRuleLinkedHashMap = allPlugins.get(pluginType);
+ ConcurrentMap<String, FormStructure> result = new ConcurrentHashMap<>();
+ if (pluginIdentifierOptionRuleLinkedHashMap == null) {
+ return result;
+ }
+
+ pluginIdentifierOptionRuleLinkedHashMap.forEach((key, value) -> result.put(key.getPluginName(), SeaTunnelOptionRuleWrapper.wrapper(value, key.getPluginName(), pluginType)));
+
+ return result;
+ }
+
+ public static ConcurrentMap<String, FormStructure> getTransformFormStructures(Map<PluginType, LinkedHashMap<PluginIdentifier, OptionRule>> allPlugins) {
+ return getDownloadedConnectorFormStructures(allPlugins, PluginType.TRANSFORM);
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/SeaTunnelOptionRuleWrapper.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/SeaTunnelOptionRuleWrapper.java
new file mode 100644
index 00000000..d54f3a4e
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/SeaTunnelOptionRuleWrapper.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.framework;
+
+import static org.apache.seatunnel.app.common.SeaTunnelConnectorI18n.CONNECTOR_I18N_CONFIG_EN;
+import static org.apache.seatunnel.app.common.SeaTunnelConnectorI18n.CONNECTOR_I18N_CONFIG_ZH;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.api.configuration.util.Expression;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import org.apache.seatunnel.app.dynamicforms.AbstractFormOption;
+import org.apache.seatunnel.app.dynamicforms.FormLocale;
+import org.apache.seatunnel.app.dynamicforms.FormOptionBuilder;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import org.apache.seatunnel.app.dynamicforms.FormStructureBuilder;
+import org.apache.seatunnel.app.dynamicforms.validate.ValidateBuilder;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+public class SeaTunnelOptionRuleWrapper {
+ public static FormStructure wrapper(
+ @NonNull OptionRule optionRule,
+ @NonNull String connectorName,
+ @NonNull PluginType pluginType) {
+ return wrapper(
+ optionRule.getOptionalOptions(),
+ optionRule.getRequiredOptions(),
+ connectorName + "[" + pluginType.getType() + "]");
+ }
+
+ public static FormStructure wrapper(
+ @NonNull List<Option<?>> optionList,
+ @NonNull List<RequiredOption> requiredList,
+ @NonNull String connectorName,
+ @NonNull PluginType pluginType) {
+ return wrapper(optionList, requiredList, connectorName + "[" + pluginType.getType() + "]");
+ }
+
+ public static FormStructure wrapper(@NonNull OptionRule optionRule, @NonNull String name) {
+ return wrapper(optionRule.getOptionalOptions(), optionRule.getRequiredOptions(), name);
+ }
+
+ public static FormStructure wrapper(
+ @NonNull List<Option<?>> optionList,
+ @NonNull List<RequiredOption> requiredList,
+ @NonNull String name) {
+ FormLocale locale = new FormLocale();
+ List<AbstractFormOption> optionFormOptions = wrapperOptionOptions(name, optionList, locale);
+ List<List<AbstractFormOption>> requiredFormOptions =
+ wrapperRequiredOptions(name, requiredList, locale);
+
+ FormStructureBuilder formStructureBuilder = FormStructure.builder().name(name);
+
+ if (!CollectionUtils.isEmpty(requiredFormOptions)) {
+ requiredFormOptions.forEach(
+ list -> {
+ if (CollectionUtils.isEmpty(list)) {
+ return;
+ }
+
+ formStructureBuilder.addFormOption(list.toArray(new AbstractFormOption[1]));
+ });
+ }
+
+ if (!CollectionUtils.isEmpty(optionFormOptions)) {
+ formStructureBuilder.addFormOption(
+ optionFormOptions.toArray(new AbstractFormOption[1]));
+ }
+
+ formStructureBuilder.withLocale(locale);
+
+ return FormOptionSort.sortFormStructure(formStructureBuilder.build());
+ }
+
+ private static List<AbstractFormOption> wrapperOptionOptions(
+ @NonNull String connectorName, @NonNull List<Option<?>> optionList, FormLocale locale) {
+ return optionList.stream()
+ .map(
+ option -> {
+ return wrapperToFormOption(connectorName, option, locale);
+ })
+ .collect(Collectors.toList());
+ }
+
+ private static List<List<AbstractFormOption>> wrapperRequiredOptions(
+ @NonNull String connectorName,
+ @NonNull List<RequiredOption> requiredList,
+ FormLocale locale) {
+ List<List<AbstractFormOption>> formOptionsList =
+ requiredList.stream()
+ .map(
+ option -> {
+ if (option instanceof RequiredOption.AbsolutelyRequiredOptions) {
+ List<AbstractFormOption> collect =
+ ((RequiredOption.AbsolutelyRequiredOptions) option)
+ .getRequiredOption().stream()
+ .map(
+ requiredOption -> {
+ return wrapperToFormOption(
+ connectorName,
+ requiredOption,
+ locale)
+ .withValidate(
+ ValidateBuilder
+ .builder()
+ .nonEmptyValidateBuilder()
+ .nonEmptyValidate());
+ })
+ .collect(Collectors.toList());
+ return collect;
+ }
+
+ if (option instanceof RequiredOption.BundledRequiredOptions) {
+ List<Option<?>> bundledRequiredOptions =
+ ((RequiredOption.BundledRequiredOptions) option)
+ .getRequiredOption();
+ List<String> bundledFields =
+ bundledRequiredOptions.stream()
+ .map(requiredOption -> requiredOption.key())
+ .collect(Collectors.toList());
+
+ List<AbstractFormOption> collect =
+ bundledRequiredOptions.stream()
+ .map(
+ requiredOption -> {
+ AbstractFormOption
+ bundledRequiredFormOption =
+ wrapperToFormOption(
+ connectorName,
+ requiredOption,
+ locale);
+ bundledRequiredFormOption
+ .withValidate(
+ ValidateBuilder
+ .builder()
+ .unionNonEmptyValidateBuilder()
+ .fields(
+ bundledFields
+ .toArray(
+ new String
+ [1]))
+ .unionNonEmptyValidate());
+ return bundledRequiredFormOption;
+ })
+ .collect(Collectors.toList());
+ return collect;
+ }
+
+ if (option instanceof RequiredOption.ExclusiveRequiredOptions) {
+ List<Option<?>> exclusiveOptions =
+ ((RequiredOption.ExclusiveRequiredOptions) option)
+ .getExclusiveOptions();
+ List<String> exclusiveFields =
+ exclusiveOptions.stream()
+ .map(requiredOption -> requiredOption.key())
+ .collect(Collectors.toList());
+
+ List<AbstractFormOption> collect =
+ exclusiveOptions.stream()
+ .map(
+ requiredOption -> {
+ AbstractFormOption
+ exclusiveRequiredFormOption =
+ wrapperToFormOption(
+ connectorName,
+ requiredOption,
+ locale)
+ .withValidate(
+ ValidateBuilder
+ .builder()
+ .mutuallyExclusiveValidateBuilder()
+ .fields(
+ exclusiveFields
+ .toArray(
+ new String
+ [1]))
+ .mutuallyExclusiveValidate());
+ return exclusiveRequiredFormOption;
+ })
+ .collect(Collectors.toList());
+ return collect;
+ }
+
+ if (option instanceof RequiredOption.ConditionalRequiredOptions) {
+ RequiredOption.ConditionalRequiredOptions
+ conditionalRequiredOptions =
+ (RequiredOption.ConditionalRequiredOptions)
+ option;
+
+ // we only support one field to control a form option, so we
+ // only need get condition key from the
+ // first expression. And all expression is 'or' and every
+ // condition have the same key.
+ String conditionKey =
+ conditionalRequiredOptions
+ .getExpression()
+ .getCondition()
+ .getOption()
+ .key();
+ List<Object> expectValueList = new ArrayList<>();
+ Expression expression =
+ conditionalRequiredOptions.getExpression();
+ expectValueList.add(
+ expression
+ .getCondition()
+ .getExpectValue()
+ .toString());
+ while (expression.hasNext()) {
+ expression = expression.getNext();
+ expectValueList.add(
+ expression
+ .getCondition()
+ .getExpectValue()
+ .toString());
+ }
+ List<AbstractFormOption> collect =
+ conditionalRequiredOptions.getRequiredOption()
+ .stream()
+ .map(
+ requiredOption -> {
+ return wrapperToFormOption(
+ connectorName,
+ requiredOption,
+ locale)
+ .withShow(
+ conditionKey,
+ expectValueList)
+ .withValidate(
+ ValidateBuilder
+ .builder()
+ .nonEmptyValidateBuilder()
+ .nonEmptyValidate());
+ })
+ .collect(Collectors.toList());
+ return collect;
+ }
+
+ throw new UnSupportWrapperException(
+ connectorName, "Unknown", option.toString());
+ })
+ .collect(Collectors.toList());
+
+ return formOptionsList;
+ }
+
+ private static AbstractFormOption wrapperToFormOption(
+ @NonNull String connectorName, @NonNull Option<?> option, @NonNull FormLocale locale) {
+ if (Boolean.class.equals(option.typeReference().getType())) {
+ return selectInput(
+ connectorName,
+ option,
+ Arrays.asList(
+ new ImmutablePair("true", true), new ImmutablePair("false", false)),
+ locale);
+ }
+
+ if (Double.class.equals(option.typeReference().getType())
+ || Duration.class.equals(option.typeReference().getType())
+ || Float.class.equals(option.typeReference().getType())
+ || Integer.class.equals(option.typeReference().getType())
+ || Long.class.equals(option.typeReference().getType())
+ || String.class.equals(option.typeReference().getType())) {
+
+ if (option.key().toLowerCase(Locale.ROOT).equals("password")) {
+ return passwordInput(connectorName, option, locale);
+ }
+ if (option.defaultValue() != null && option.defaultValue().toString().contains("\n")) {
+ return textareaInput(connectorName, option, locale);
+ }
+
+ return textInput(connectorName, option, locale);
+ }
+
+ if (option.typeReference().getType().getTypeName().startsWith("java.util.List")
+ || option.typeReference().getType().getTypeName().startsWith("java.util.Map")
+ || option.typeReference()
+ .getType()
+ .getTypeName()
+ .startsWith("org.apache.seatunnel.api.configuration.Options")) {
+
+ return textareaInput(connectorName, option, locale);
+ }
+
+ if (SingleChoiceOption.class.isAssignableFrom(option.getClass())) {
+ List<?> optionValues = ((SingleChoiceOption<?>) option).getOptionValues();
+ List<ImmutablePair> staticSelectOptions =
+ optionValues.stream()
+ .map(o -> new ImmutablePair(o.toString(), o.toString()))
+ .collect(Collectors.toList());
+ return selectInput(connectorName, option, staticSelectOptions, locale);
+ }
+
+ if (((Class) option.typeReference().getType()).isEnum()) {
+ Object[] enumConstants = ((Class) option.typeReference().getType()).getEnumConstants();
+ List<ImmutablePair> staticSelectOptions =
+ Arrays.stream(enumConstants)
+ .map(o -> new ImmutablePair(o.toString(), o.toString()))
+ .collect(Collectors.toList());
+ return selectInput(connectorName, option, staticSelectOptions, locale);
+ }
+
+ if (((Class) option.typeReference().getType())
+ .getTypeName()
+ .startsWith("org.apache.seatunnel")) {
+ return textareaInput(connectorName, option, locale);
+ }
+
+ // object type and map type will show as textarea
+ throw new UnSupportWrapperException(
+ connectorName, option.key(), option.typeReference().getType().getTypeName());
+ }
+
+ private static boolean enableLabelI18n(
+ String connectorName, String optionI18nKey, FormLocale locale) {
+ String realConnectorName = connectorName;
+ if (connectorName.contains("[")) {
+ realConnectorName = connectorName.substring(0, connectorName.indexOf("["));
+ }
+
+ boolean labelEnableI18n = false;
+ if (CONNECTOR_I18N_CONFIG_ZH.hasPath(realConnectorName)
+ && CONNECTOR_I18N_CONFIG_ZH.getConfig(realConnectorName).hasPath(optionI18nKey)) {
+ locale.addZhCN(
+ optionI18nKey,
+ CONNECTOR_I18N_CONFIG_ZH.getConfig(realConnectorName).getString(optionI18nKey));
+ labelEnableI18n = true;
+ }
+
+ if (CONNECTOR_I18N_CONFIG_EN.hasPath(realConnectorName)
+ && CONNECTOR_I18N_CONFIG_EN.getConfig(realConnectorName).hasPath(optionI18nKey)) {
+ locale.addEnUS(
+ optionI18nKey,
+ CONNECTOR_I18N_CONFIG_EN.getConfig(realConnectorName).getString(optionI18nKey));
+ labelEnableI18n = true;
+ }
+ return labelEnableI18n;
+ }
+
+ private static AbstractFormOption selectInput(
+ String connectorName,
+ @NonNull Option<?> option,
+ List<ImmutablePair> staticSelectOptions,
+ FormLocale locale) {
+ FormOptionBuilder builder = FormOptionBuilder.builder();
+ String i18nOptionKey = option.key().replace(".", "_").replace("-", "_");
+ if (enableLabelI18n(connectorName, i18nOptionKey, locale)) {
+ builder = builder.withI18nLabel(i18nOptionKey);
+ } else {
+ builder = builder.withLabel(option.key());
+ }
+
+ FormOptionBuilder.StaticSelectOptionBuilder staticSelectOptionBuilder =
+ builder.withField(option.key()).staticSelectOptionBuilder();
+
+ for (ImmutablePair selectOption : staticSelectOptions) {
+ if (enableLabelI18n(connectorName, selectOption.getLeft().toString(), locale)) {
+ staticSelectOptionBuilder.addI18nSelectOptions(selectOption);
+ } else {
+ staticSelectOptionBuilder.addSelectOptions(selectOption);
+ }
+ }
+
+ AbstractFormOption abstractFormOption =
+ staticSelectOptionBuilder
+ .formStaticSelectOption()
+ .withDefaultValue(
+ option.defaultValue() == null ? null
+ : option.defaultValue().toString());
+
+ String placeholderI18nOptionKey = i18nOptionKey + "_description";
+ if (enableLabelI18n(connectorName, placeholderI18nOptionKey, locale)) {
+ abstractFormOption = abstractFormOption.withI18nPlaceholder(placeholderI18nOptionKey);
+ } else {
+ abstractFormOption = abstractFormOption.withPlaceholder(option.getDescription());
+ }
+
+ return abstractFormOption;
+ }
+
+ private static AbstractFormOption passwordInput(
+ String connectorName, @NonNull Option<?> option, FormLocale locale) {
+ FormOptionBuilder builder = FormOptionBuilder.builder();
+ String i18nOptionKey = option.key().replace(".", "_").replace("-", "_");
+ if (enableLabelI18n(connectorName, i18nOptionKey, locale)) {
+ builder = builder.withI18nLabel(i18nOptionKey);
+ } else {
+ builder = builder.withLabel(option.key());
+ }
+
+ AbstractFormOption abstractFormOption =
+ builder.withField(option.key())
+ .inputOptionBuilder()
+ .formPasswordInputOption()
+ .withDefaultValue(option.defaultValue());
+
+ String placeholderI18nOptionKey = i18nOptionKey + "_description";
+ if (enableLabelI18n(connectorName, placeholderI18nOptionKey, locale)) {
+ abstractFormOption = abstractFormOption.withI18nPlaceholder(placeholderI18nOptionKey);
+ } else {
+ abstractFormOption = abstractFormOption.withPlaceholder(option.getDescription());
+ }
+
+ return abstractFormOption;
+ }
+
+ private static AbstractFormOption textInput(
+ String connectorName, @NonNull Option<?> option, FormLocale locale) {
+ FormOptionBuilder builder = FormOptionBuilder.builder();
+ String i18nOptionKey = option.key().replace(".", "_").replace("-", "_");
+ String placeholderI18nOptionKey = i18nOptionKey + "_description";
+ if (enableLabelI18n(connectorName, i18nOptionKey, locale)) {
+ builder = builder.withI18nLabel(i18nOptionKey);
+ } else {
+ builder = builder.withLabel(option.key());
+ }
+
+ AbstractFormOption abstractFormOption =
+ builder.withField(option.key())
+ .inputOptionBuilder()
+ .formTextInputOption()
+ .withDefaultValue(option.defaultValue());
+ if (enableLabelI18n(connectorName, placeholderI18nOptionKey, locale)) {
+ abstractFormOption = abstractFormOption.withI18nPlaceholder(placeholderI18nOptionKey);
+ } else {
+ abstractFormOption = abstractFormOption.withPlaceholder(option.getDescription());
+ }
+
+ return abstractFormOption;
+ }
+
+ private static AbstractFormOption textareaInput(
+ String connectorName, @NonNull Option<?> option, FormLocale locale) {
+ FormOptionBuilder builder = FormOptionBuilder.builder();
+ String i18nOptionKey = option.key().replace(".", "_").replace("-", "_");
+ String placeholderI18nOptionKey = i18nOptionKey + "_description";
+
+ if (enableLabelI18n(connectorName, i18nOptionKey, locale)) {
+ builder = builder.withI18nLabel(i18nOptionKey);
+ } else {
+ builder = builder.withLabel(option.key());
+ }
+
+ AbstractFormOption abstractFormOption =
+ builder.withField(option.key())
+ .inputOptionBuilder()
+ .formTextareaInputOption()
+ .withClearable()
+ .withDefaultValue(option.defaultValue());
+ if (enableLabelI18n(connectorName, placeholderI18nOptionKey, locale)) {
+ abstractFormOption = abstractFormOption.withI18nPlaceholder(placeholderI18nOptionKey);
+ } else {
+ abstractFormOption = abstractFormOption.withPlaceholder(option.getDescription());
+ }
+
+ return abstractFormOption;
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/UnSupportWrapperException.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/UnSupportWrapperException.java
new file mode 100644
index 00000000..8db5be03
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thridparty/framework/UnSupportWrapperException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thridparty.framework;
+
+import lombok.NonNull;
+
+public class UnSupportWrapperException extends RuntimeException {
+ public UnSupportWrapperException(
+ @NonNull String formName, @NonNull String label, @NonNull String typeName) {
+ super(
+ String.format(
+ "Form: %s, label: %s, typeName: %s not yet supported",
+ formName, label, typeName));
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JdbcUtils.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JdbcUtils.java
new file mode 100644
index 00000000..56e1af59
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JdbcUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.utils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.regex.Pattern;
+
+public class JdbcUtils {
+
+ /**
+ * Get the host from the jdbc url.
+ *
+ * @param url jdbc:clickhouse://localhost:8123
+ * @return localhost:8123
+ */
+ public static String getAddressFromUrl(String url) {
+ checkNotNull(url, "url can not be null");
+ Pattern pattern = Pattern.compile("jdbc:clickhouse:\\/\\/(.*):([0-9]+)(.*)");
+ return pattern.matcher(url).replaceAll("$1:$2");
+ }
+
+ public static String replaceDatabase(String jdbcUrl, String databaseName) {
+ if (databaseName == null) {
+ return jdbcUrl;
+ }
+ String[] split = jdbcUrl.split("\\?");
+ if (split.length == 1) {
+ return replaceDatabaseWithoutParameter(jdbcUrl, databaseName);
+ }
+ return replaceDatabaseWithoutParameter(split[0], databaseName) + "?" + split[1];
+ }
+
+ private static String replaceDatabaseWithoutParameter(String jdbcUrl, String databaseName) {
+ int lastIndex = jdbcUrl.lastIndexOf(':');
+ char[] chars = jdbcUrl.toCharArray();
+ for (int i = lastIndex + 1; i < chars.length; i++) {
+ if (chars[i] == '/') {
+ return jdbcUrl.substring(0, i + 1) + databaseName;
+ }
+ }
+ return jdbcUrl + "/" + databaseName;
+ }
+}
diff --git a/seatunnel-web-dist/release-docs/LICENSE b/seatunnel-web-dist/release-docs/LICENSE
index ad7f6d8c..a4338a4b 100644
--- a/seatunnel-web-dist/release-docs/LICENSE
+++ b/seatunnel-web-dist/release-docs/LICENSE
@@ -222,6 +222,7 @@ The text of each license is the standard Apache 2.0 license.
(Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/)
(Apache License, Version 2.0) Apache Commons Collections (org.apache.commons:commons-collections4:4.4 - https://commons.apache.org/proper/commons-collections/)
(Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.12.0 - http://commons.apache.org/proper/commons-lang/)
+ (Apache License, Version 2.0) Apache Commons IO (commons-io:commons-io:2.11.0 - https://mvnrepository.com/artifact/commons-io/commons-io/2.11.0)
(Apache License, Version 2.0) config (com.typesafe:config:1.3.3 - https://github.com/lightbend/config)
(The Apache Software License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:19.0 - https://github.com/google/guava/guava)
(Apache License 2.0) Hibernate Validator Engine (org.hibernate.validator:hibernate-validator:6.2.2.Final - http://hibernate.org/validator/hibernate-validator)
diff --git a/seatunnel-web-dist/src/main/assembly/seatunnel-web-ci.xml b/seatunnel-web-dist/src/main/assembly/seatunnel-web-ci.xml
index a13174fc..2260084f 100644
--- a/seatunnel-web-dist/src/main/assembly/seatunnel-web-ci.xml
+++ b/seatunnel-web-dist/src/main/assembly/seatunnel-web-ci.xml
@@ -86,6 +86,19 @@
<scope>provided</scope>
</dependencySet>
+ <dependencySet>
+ <useProjectArtifact>false</useProjectArtifact>
+ <useTransitiveDependencies>true</useTransitiveDependencies>
+ <unpack>false</unpack>
+ <includes>
+ <include>*:*:jar</include>
+ </includes>
+ <excludes>
+ <exclude>org.apache.seatunnel:datasource-*:jar</exclude>
+ </excludes>
+ <outputDirectory>/libs</outputDirectory>
+ </dependencySet>
+
<!-- ============ Logging Jars ============ -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
diff --git a/seatunnel-web-dist/src/main/assembly/seatunnel-web.xml b/seatunnel-web-dist/src/main/assembly/seatunnel-web.xml
index 5cc991a4..4a726e68 100644
--- a/seatunnel-web-dist/src/main/assembly/seatunnel-web.xml
+++ b/seatunnel-web-dist/src/main/assembly/seatunnel-web.xml
@@ -86,6 +86,19 @@
<scope>provided</scope>
</dependencySet>
+ <dependencySet>
+ <useProjectArtifact>false</useProjectArtifact>
+ <useTransitiveDependencies>true</useTransitiveDependencies>
+ <unpack>false</unpack>
+ <includes>
+ <include>*:*:jar</include>
+ </includes>
+ <excludes>
+ <exclude>org.apache.seatunnel:datasource-*:jar</exclude>
+ </excludes>
+ <outputDirectory>/libs</outputDirectory>
+ </dependencySet>
+
<!-- ============ Logging Jars ============ -->
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 6b69a0f1..0f6896f7 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -4,8 +4,10 @@ classmate-1.3.1.jar
commons-collections4-4.4.jar
commons-codec-1.13.jar
commons-lang3-3.12.0.jar
+commons-io-2.11.0.jar
config-1.3.3.jar
cron-utils-9.1.6.jar
+gson-2.8.6.jar
guava-19.0.jar
hibernate-validator-6.2.2.Final.jar
jackson-annotations-2.12.7.jar
@@ -53,6 +55,8 @@ seatunnel-common-2.3.1.jar
seatunnel-config-base-2.3.1.jar
seatunnel-config-shade-2.3.1.jar
seatunnel-jackson-2.3.1-optional.jar
+seatunnel-api-2.3.1.jar
+seatunnel-plugin-discovery-2.3.1.jar
slf4j-api-1.7.25.jar
snakeyaml-1.29.jar
spring-aop-5.3.20.jar