You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/05/30 06:12:26 UTC

[dolphinscheduler] branch dev updated: [Feature][Datasource]replace DataSourceController API with string JSON and cast to DTO (#10276)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 592e433b1e [Feature][Datasource]replace DataSourceController API with string JSON and cast to DTO (#10276)
592e433b1e is described below

commit 592e433b1e2eb1e524b13a4907c568d0f4ed1733
Author: Tq <ti...@gmail.com>
AuthorDate: Mon May 30 14:12:19 2022 +0800

    [Feature][Datasource]replace DataSourceController API with string JSON and cast to DTO (#10276)
    
    * replace DataSourceController API with string JSON and cast to DTO
    
    * add DataSourceProcessorManager and DataSourceProcessorProvider to datasource processor functions
---
 .../dolphinscheduler/api/ApiApplicationServer.java |  1 -
 .../api/controller/DataSourceController.java       | 68 +++++++++++----------
 .../api/datasource/BaseDataSourceParamDTO.java     | 16 -----
 .../api/datasource/DataSourceProcessor.java        | 15 +++++
 .../clickhouse/ClickHouseDataSourceProcessor.java  | 17 +++++-
 .../api/datasource/db2/Db2DataSourceProcessor.java | 16 ++++-
 .../datasource/hive/HiveDataSourceProcessor.java   | 16 ++++-
 .../datasource/mysql/MySQLDataSourceProcessor.java | 21 +++++--
 .../oracle/OracleDataSourceProcessor.java          | 18 +++++-
 .../postgresql/PostgreSQLDataSourceProcessor.java  | 18 +++++-
 .../presto/PrestoDataSourceProcessor.java          | 18 +++++-
 .../redshift/RedshiftDataSourceProcessor.java      | 26 +++++---
 .../datasource/spark/SparkDataSourceProcessor.java | 16 ++++-
 .../sqlserver/SQLServerDataSourceProcessor.java    | 18 +++++-
 .../api/plugin/DataSourceProcessorManager.java     | 63 ++++++++++++++++++++
 .../api/plugin/DataSourceProcessorProvider.java    | 52 ++++++++++++++++
 .../datasource/api/utils/DataSourceUtils.java      | 69 +++++++---------------
 .../apache/dolphinscheduler/spi/enums/DbType.java  | 11 +++-
 18 files changed, 347 insertions(+), 132 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 9e6aa94530..7c3532f7f3 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.api;
 
 import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java
index ab9d1c29c0..fdaebaa73e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java
@@ -17,19 +17,11 @@
 
 package org.apache.dolphinscheduler.api.controller;
 
-import static org.apache.dolphinscheduler.api.enums.Status.AUTHORIZED_DATA_SOURCE;
-import static org.apache.dolphinscheduler.api.enums.Status.CONNECTION_TEST_FAILURE;
-import static org.apache.dolphinscheduler.api.enums.Status.CONNECT_DATASOURCE_FAILURE;
-import static org.apache.dolphinscheduler.api.enums.Status.CREATE_DATASOURCE_ERROR;
-import static org.apache.dolphinscheduler.api.enums.Status.DELETE_DATA_SOURCE_FAILURE;
-import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLES_ERROR;
-import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLE_COLUMNS_ERROR;
-import static org.apache.dolphinscheduler.api.enums.Status.KERBEROS_STARTUP_STATE;
-import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DATASOURCE_ERROR;
-import static org.apache.dolphinscheduler.api.enums.Status.UNAUTHORIZED_DATASOURCE;
-import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_DATASOURCE_ERROR;
-import static org.apache.dolphinscheduler.api.enums.Status.VERIFY_DATASOURCE_NAME_FAILURE;
-
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
 import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.exceptions.ApiException;
@@ -43,9 +35,6 @@ import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSour
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
-
-import java.util.Map;
-
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.bind.annotation.DeleteMapping;
@@ -59,14 +48,23 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseStatus;
 import org.springframework.web.bind.annotation.RestController;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiImplicitParam;
-import io.swagger.annotations.ApiImplicitParams;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
 import springfox.documentation.annotations.ApiIgnore;
 
+import java.util.Map;
+
+import static org.apache.dolphinscheduler.api.enums.Status.AUTHORIZED_DATA_SOURCE;
+import static org.apache.dolphinscheduler.api.enums.Status.CONNECTION_TEST_FAILURE;
+import static org.apache.dolphinscheduler.api.enums.Status.CONNECT_DATASOURCE_FAILURE;
+import static org.apache.dolphinscheduler.api.enums.Status.CREATE_DATASOURCE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.DELETE_DATA_SOURCE_FAILURE;
+import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLES_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLE_COLUMNS_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.KERBEROS_STARTUP_STATE;
+import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DATASOURCE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.UNAUTHORIZED_DATASOURCE;
+import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_DATASOURCE_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.VERIFY_DATASOURCE_NAME_FAILURE;
+
 /**
  * data source controller
  */
@@ -82,7 +80,8 @@ public class DataSourceController extends BaseController {
      * create data source
      *
      * @param loginUser login user
-     * @param dataSourceParam datasource param
+     * @param jsonStr   datasource param
+     *                  example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2}
      * @return create result code
      */
     @ApiOperation(value = "createDataSource", notes = "CREATE_DATA_SOURCE_NOTES")
@@ -92,7 +91,8 @@ public class DataSourceController extends BaseController {
     @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
     public Result createDataSource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                                    @ApiParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true)
-                                   @RequestBody BaseDataSourceParamDTO dataSourceParam) {
+                                   @RequestBody String jsonStr) {
+        BaseDataSourceParamDTO dataSourceParam = DataSourceUtils.buildDatasourceParam(jsonStr);
         return dataSourceService.createDataSource(loginUser, dataSourceParam);
     }
 
@@ -100,14 +100,15 @@ public class DataSourceController extends BaseController {
      * updateProcessInstance data source
      *
      * @param loginUser login user
-     * @param id datasource id
-     * @param dataSourceParam datasource param
+     * @param id        datasource id
+     * @param jsonStr   datasource param
+     *                  example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2}
      * @return update result code
      */
     @ApiOperation(value = "updateDataSource", notes = "UPDATE_DATA_SOURCE_NOTES")
     @ApiImplicitParams({
-        @ApiImplicitParam(name = "id", value = "DATA_SOURCE_ID", required = true, dataType = "Integer"),
-        @ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO")
+            @ApiImplicitParam(name = "id", value = "DATA_SOURCE_ID", required = true, dataType = "Integer"),
+            @ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO")
     })
     @PutMapping(value = "/{id}")
     @ResponseStatus(HttpStatus.OK)
@@ -115,7 +116,8 @@ public class DataSourceController extends BaseController {
     @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
     public Result updateDataSource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                                    @PathVariable(value = "id") Integer id,
-                                   @RequestBody BaseDataSourceParamDTO dataSourceParam) {
+                                   @RequestBody String jsonStr) {
+        BaseDataSourceParamDTO dataSourceParam = DataSourceUtils.buildDatasourceParam(jsonStr);
         dataSourceParam.setId(id);
         return dataSourceService.updateDataSource(dataSourceParam.getId(), loginUser, dataSourceParam);
     }
@@ -199,19 +201,21 @@ public class DataSourceController extends BaseController {
      * connect datasource
      *
      * @param loginUser login user
-     * @param dataSourceParam datasource param
+     * @param jsonStr   datasource param
+     *                  example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2}
      * @return connect result code
      */
     @ApiOperation(value = "connectDataSource", notes = "CONNECT_DATA_SOURCE_NOTES")
     @ApiImplicitParams({
-        @ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO")
+            @ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO")
     })
     @PostMapping(value = "/connect")
     @ResponseStatus(HttpStatus.OK)
     @ApiException(CONNECT_DATASOURCE_FAILURE)
     @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
     public Result connectDataSource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
-                                    @RequestBody BaseDataSourceParamDTO dataSourceParam) {
+                                    @RequestBody String jsonStr) {
+        BaseDataSourceParamDTO dataSourceParam = DataSourceUtils.buildDatasourceParam(jsonStr);
         DataSourceUtils.checkDatasourceParam(dataSourceParam);
         ConnectionParam connectionParams = DataSourceUtils.buildConnectionParams(dataSourceParam);
         return dataSourceService.checkConnection(dataSourceParam.getType(), connectionParams);
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/BaseDataSourceParamDTO.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/BaseDataSourceParamDTO.java
index 7d9cbb2cf6..28fe5c3c0b 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/BaseDataSourceParamDTO.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/BaseDataSourceParamDTO.java
@@ -32,9 +32,6 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
 import java.io.Serializable;
 import java.util.Map;
 
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
 /**
  * Basic datasource params submitted to api.
  * <p>
@@ -49,19 +46,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
  * see {@link PrestoDataSourceParamDTO}
  * see {@link RedshiftDataSourceParamDTO}
  */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-@JsonSubTypes(value = {
-    @JsonSubTypes.Type(value = MySQLDataSourceParamDTO.class, name = "MYSQL"),
-    @JsonSubTypes.Type(value = PostgreSQLDataSourceParamDTO.class, name = "POSTGRESQL"),
-    @JsonSubTypes.Type(value = HiveDataSourceParamDTO.class, name = "HIVE"),
-    @JsonSubTypes.Type(value = SparkDataSourceParamDTO.class, name = "SPARK"),
-    @JsonSubTypes.Type(value = ClickHouseDataSourceParamDTO.class, name = "CLICKHOUSE"),
-    @JsonSubTypes.Type(value = OracleDataSourceParamDTO.class, name = "ORACLE"),
-    @JsonSubTypes.Type(value = SQLServerDataSourceParamDTO.class, name = "SQLSERVER"),
-    @JsonSubTypes.Type(value = Db2DataSourceParamDTO.class, name = "DB2"),
-    @JsonSubTypes.Type(value = PrestoDataSourceParamDTO.class, name = "PRESTO"),
-    @JsonSubTypes.Type(value = RedshiftDataSourceParamDTO.class, name = "REDSHIFT"),
-})
 public abstract class BaseDataSourceParamDTO implements Serializable {
 
     protected Integer id;
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java
index e4603165b7..3cfac25663 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java
@@ -26,6 +26,15 @@ import java.sql.SQLException;
 
 public interface DataSourceProcessor {
 
+    /**
+     * cast JSON to relate DTO
+     *
+     * @param paramJson
+     * @return {@link BaseDataSourceParamDTO}
+     */
+    BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson);
+
+
     /**
      * check datasource param is valid
      */
@@ -33,6 +42,7 @@ public interface DataSourceProcessor {
 
     /**
      * get Datasource Client UniqueId
+     *
      * @return UniqueId
      */
     String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType);
@@ -90,4 +100,9 @@ public interface DataSourceProcessor {
      * @return {@link DbType}
      */
     DbType getDbType();
+
+    /**
+     * get datasource processor
+     */
+    DataSourceProcessor create();
 }
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessor.java
index ea71d25a8d..509b9221d5 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessor.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessor.java
@@ -17,8 +17,11 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.datasource.clickhouse;
 
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -26,16 +29,21 @@ import org.apache.dolphinscheduler.spi.utils.Constants;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import org.apache.commons.collections4.MapUtils;
-
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+@AutoService(DataSourceProcessor.class)
 public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor {
 
+    @Override
+    public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+        return JSONUtils.parseObject(paramJson, ClickHouseDataSourceParamDTO.class);
+    }
+
+
     @Override
     public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
         ClickHouseConnectionParam connectionParams = (ClickHouseConnectionParam) createConnectionParams(connectionJson);
@@ -110,6 +118,11 @@ public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor {
         return DbType.CLICKHOUSE;
     }
 
+    @Override
+    public DataSourceProcessor create() {
+        return new ClickHouseDataSourceProcessor();
+    }
+
     private String transformOther(Map<String, String> otherMap) {
         if (MapUtils.isEmpty(otherMap)) {
             return null;
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessor.java
index 7e0548c71f..16d326fecb 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessor.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessor.java
@@ -17,8 +17,11 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.datasource.db2;
 
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@@ -27,16 +30,20 @@ import org.apache.dolphinscheduler.spi.utils.Constants;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import org.apache.commons.collections4.MapUtils;
-
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+@AutoService(DataSourceProcessor.class)
 public class Db2DataSourceProcessor extends AbstractDataSourceProcessor {
 
+    @Override
+    public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+        return JSONUtils.parseObject(paramJson, Db2DataSourceParamDTO.class);
+    }
+
     @Override
     public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
         Db2ConnectionParam connectionParams = (Db2ConnectionParam) createConnectionParams(connectionJson);
@@ -106,6 +113,11 @@ public class Db2DataSourceProcessor extends AbstractDataSourceProcessor {
         return DbType.DB2;
     }
 
+    @Override
+    public DataSourceProcessor create() {
+        return new Db2DataSourceProcessor();
+    }
+
     @Override
     public String getValidationQuery() {
         return Constants.DB2_VALIDATION_QUERY;
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/hive/HiveDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/hive/HiveDataSourceProcessor.java
index 898ceb39aa..8c3d94eac6 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/hive/HiveDataSourceProcessor.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/hive/HiveDataSourceProcessor.java
@@ -17,8 +17,11 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.datasource.hive;
 
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
@@ -28,8 +31,6 @@ import org.apache.dolphinscheduler.spi.utils.Constants;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import org.apache.commons.collections4.MapUtils;
-
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -37,8 +38,14 @@ import java.sql.SQLException;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+@AutoService(DataSourceProcessor.class)
 public class HiveDataSourceProcessor extends AbstractDataSourceProcessor {
 
+    @Override
+    public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+        return JSONUtils.parseObject(paramJson, HiveDataSourceParamDTO.class);
+    }
+
     @Override
     public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
         HiveDataSourceParamDTO hiveDataSourceParamDTO = new HiveDataSourceParamDTO();
@@ -136,6 +143,11 @@ public class HiveDataSourceProcessor extends AbstractDataSourceProcessor {
         return DbType.HIVE;
     }
 
+    @Override
+    public DataSourceProcessor create() {
+        return new HiveDataSourceProcessor();
+    }
+
     private String transformOther(Map<String, String> otherMap) {
         if (MapUtils.isEmpty(otherMap)) {
             return null;
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/mysql/MySQLDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/mysql/MySQLDataSourceProcessor.java
index a0c5347a7c..a4d4ec08eb 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/mysql/MySQLDataSourceProcessor.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/mysql/MySQLDataSourceProcessor.java
@@ -17,8 +17,11 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql;
 
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@@ -26,8 +29,8 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.dolphinscheduler.spi.utils.Constants;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import org.apache.commons.collections4.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -36,9 +39,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+@AutoService(DataSourceProcessor.class)
 public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
 
     private final Logger logger = LoggerFactory.getLogger(MySQLDataSourceProcessor.class);
@@ -53,6 +54,11 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
 
     private static final String APPEND_PARAMS = "allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false";
 
+    @Override
+    public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+        return JSONUtils.parseObject(paramJson, MySQLDataSourceParamDTO.class);
+    }
+
     @Override
     public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
         MySQLConnectionParam connectionParams = (MySQLConnectionParam) createConnectionParams(connectionJson);
@@ -138,6 +144,11 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
         return DbType.MYSQL;
     }
 
+    @Override
+    public DataSourceProcessor create() {
+        return new MySQLDataSourceProcessor();
+    }
+
     private String transformOther(Map<String, String> paramMap) {
         if (MapUtils.isEmpty(paramMap)) {
             return null;
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessor.java
index 44c4ab8a46..b57f6a97f8 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessor.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessor.java
@@ -17,8 +17,12 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.datasource.oracle;
 
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@@ -27,9 +31,6 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.dolphinscheduler.spi.utils.Constants;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang.StringUtils;
-
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -38,8 +39,14 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+@AutoService(DataSourceProcessor.class)
 public class OracleDataSourceProcessor extends AbstractDataSourceProcessor {
 
+    @Override
+    public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+        return JSONUtils.parseObject(paramJson, OracleDataSourceParamDTO.class);
+    }
+
     @Override
     public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
         OracleConnectionParam connectionParams = (OracleConnectionParam) createConnectionParams(connectionJson);
@@ -128,6 +135,11 @@ public class OracleDataSourceProcessor extends AbstractDataSourceProcessor {
         return DbType.ORACLE;
     }
 
+    @Override
+    public DataSourceProcessor create() {
+        return new OracleDataSourceProcessor();
+    }
+
     private String transformOther(Map<String, String> otherMap) {
         if (MapUtils.isEmpty(otherMap)) {
             return null;
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessor.java
index f14302f1e7..55ebcc4bb5 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessor.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessor.java
@@ -17,8 +17,12 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.datasource.postgresql;
 
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@@ -26,17 +30,20 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.dolphinscheduler.spi.utils.Constants;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang.StringUtils;
-
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+@AutoService(DataSourceProcessor.class)
 public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
 
+    @Override
+    public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+        return JSONUtils.parseObject(paramJson, PostgreSQLDataSourceParamDTO.class);
+    }
+
     @Override
     public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
         PostgreSQLConnectionParam connectionParams = (PostgreSQLConnectionParam) createConnectionParams(connectionJson);
@@ -111,6 +118,11 @@ public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
         return DbType.POSTGRESQL;
     }
 
+    @Override
+    public DataSourceProcessor create() {
+        return new PostgreSQLDataSourceProcessor();
+    }
+
     private String transformOther(Map<String, String> otherMap) {
         if (MapUtils.isEmpty(otherMap)) {
             return null;
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessor.java
index 8fe1a7231c..bf324eaffc 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessor.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessor.java
@@ -17,8 +17,12 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.datasource.presto;
 
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@@ -26,9 +30,6 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.dolphinscheduler.spi.utils.Constants;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang.StringUtils;
-
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -37,8 +38,14 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+@AutoService(DataSourceProcessor.class)
 public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor {
 
+    @Override
+    public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+        return JSONUtils.parseObject(paramJson, PrestoDataSourceParamDTO.class);
+    }
+
     @Override
     public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
         PrestoConnectionParam connectionParams = (PrestoConnectionParam) createConnectionParams(connectionJson);
@@ -113,6 +120,11 @@ public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor {
         return DbType.PRESTO;
     }
 
+    @Override
+    public DataSourceProcessor create() {
+        return new PrestoDataSourceProcessor();
+    }
+
     private String transformOther(Map<String, String> otherMap) {
         if (MapUtils.isNotEmpty(otherMap)) {
             List<String> list = new ArrayList<>();
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessor.java
index 5a9c521fce..e002791b76 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessor.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessor.java
@@ -17,8 +17,12 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.datasource.redshift;
 
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@@ -26,9 +30,6 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.dolphinscheduler.spi.utils.Constants;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang.StringUtils;
-
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -37,18 +38,24 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+@AutoService(DataSourceProcessor.class)
 public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
 
+    @Override
+    public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+        return JSONUtils.parseObject(paramJson, RedshiftDataSourceParamDTO.class);
+    }
+
     @Override
     public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
         RedshiftConnectionParam
-            connectionParams = (RedshiftConnectionParam) createConnectionParams(connectionJson);
+                connectionParams = (RedshiftConnectionParam) createConnectionParams(connectionJson);
 
         String[] hostSeperator = connectionParams.getAddress().split(Constants.DOUBLE_SLASH);
         String[] hostPortArray = hostSeperator[hostSeperator.length - 1].split(Constants.COMMA);
 
         RedshiftDataSourceParamDTO
-            redshiftDatasourceParamDTO = new RedshiftDataSourceParamDTO();
+                redshiftDatasourceParamDTO = new RedshiftDataSourceParamDTO();
         redshiftDatasourceParamDTO.setPort(Integer.parseInt(hostPortArray[0].split(Constants.COLON)[1]));
         redshiftDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]);
         redshiftDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
@@ -65,7 +72,7 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
         String jdbcUrl = address + Constants.SLASH + redshiftParam.getDatabase();
 
         RedshiftConnectionParam
-            redshiftConnectionParam = new RedshiftConnectionParam();
+                redshiftConnectionParam = new RedshiftConnectionParam();
         redshiftConnectionParam.setUser(redshiftParam.getUserName());
         redshiftConnectionParam.setPassword(PasswordUtils.encodePassword(redshiftParam.getPassword()));
         redshiftConnectionParam.setOther(transformOther(redshiftParam.getOther()));
@@ -97,7 +104,7 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
     @Override
     public String getJdbcUrl(ConnectionParam connectionParam) {
         RedshiftConnectionParam
-            redshiftConnectionParam = (RedshiftConnectionParam) connectionParam;
+                redshiftConnectionParam = (RedshiftConnectionParam) connectionParam;
         if (!StringUtils.isEmpty(redshiftConnectionParam.getOther())) {
             return String.format("%s?%s", redshiftConnectionParam.getJdbcUrl(), redshiftConnectionParam.getOther());
         }
@@ -117,6 +124,11 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
         return DbType.REDSHIFT;
     }
 
+    @Override
+    public DataSourceProcessor create() {
+        return new RedshiftDataSourceProcessor();
+    }
+
     private String transformOther(Map<String, String> otherMap) {
         if (MapUtils.isNotEmpty(otherMap)) {
             List<String> list = new ArrayList<>(otherMap.size());
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessor.java
index 69fcd4ff24..2d25f427b0 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessor.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessor.java
@@ -17,8 +17,11 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.datasource.spark;
 
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
@@ -28,8 +31,6 @@ import org.apache.dolphinscheduler.spi.utils.Constants;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import org.apache.commons.collections4.MapUtils;
-
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -40,8 +41,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+@AutoService(DataSourceProcessor.class)
 public class SparkDataSourceProcessor extends AbstractDataSourceProcessor {
 
+    @Override
+    public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+        return JSONUtils.parseObject(paramJson, SparkDataSourceParamDTO.class);
+    }
+
     @Override
     public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
         SparkConnectionParam connectionParams = (SparkConnectionParam) createConnectionParams(connectionJson);
@@ -138,6 +145,11 @@ public class SparkDataSourceProcessor extends AbstractDataSourceProcessor {
         return DbType.SPARK;
     }
 
+    @Override
+    public DataSourceProcessor create() {
+        return new SparkDataSourceProcessor();
+    }
+
     private String transformOther(Map<String, String> otherMap) {
         if (MapUtils.isEmpty(otherMap)) {
             return null;
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/sqlserver/SQLServerDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/sqlserver/SQLServerDataSourceProcessor.java
index 44855d1fb6..e33c3c56fd 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/sqlserver/SQLServerDataSourceProcessor.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/sqlserver/SQLServerDataSourceProcessor.java
@@ -17,8 +17,12 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.datasource.sqlserver;
 
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@@ -26,17 +30,20 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.dolphinscheduler.spi.utils.Constants;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang.StringUtils;
-
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+@AutoService(DataSourceProcessor.class)
 public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor {
 
+    @Override
+    public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+        return JSONUtils.parseObject(paramJson, SQLServerDataSourceParamDTO.class);
+    }
+
     @Override
     public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
         SQLServerConnectionParam connectionParams = (SQLServerConnectionParam) createConnectionParams(connectionJson);
@@ -109,6 +116,11 @@ public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor {
         return DbType.SQLSERVER;
     }
 
+    @Override
+    public DataSourceProcessor create() {
+        return new SQLServerDataSourceProcessor();
+    }
+
     private String transformOther(Map<String, String> otherMap) {
         if (MapUtils.isEmpty(otherMap)) {
             return null;
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java
new file mode 100644
index 0000000000..de55f383a3
--- /dev/null
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
+
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.lang.String.format;
+
+public class DataSourceProcessorManager {
+    private static final Logger logger = LoggerFactory.getLogger(DataSourceProcessorManager.class);
+
+    private static final Map<String, DataSourceProcessor> dataSourceProcessorMap = new ConcurrentHashMap<>();
+
+    public Map<String, DataSourceProcessor> getDataSourceProcessorMap() {
+        return Collections.unmodifiableMap(dataSourceProcessorMap);
+    }
+
+    public void installProcessor() {
+        final Set<String> names = new HashSet<>();
+
+        ServiceLoader.load(DataSourceProcessor.class).forEach(factory -> {
+            final String name = factory.getDbType().name();
+
+            logger.info("start register processor: {}", name);
+            if (!names.add(name)) {
+                throw new IllegalStateException(format("Duplicate datasource plugins named '%s'", name));
+            }
+            loadDatasourceClient(factory);
+
+            logger.info("done register processor: {}", name);
+
+        });
+    }
+
+    private void loadDatasourceClient(DataSourceProcessor processor) {
+        DataSourceProcessor instance = processor.create();
+        dataSourceProcessorMap.put(processor.getDbType().name(), instance);
+    }
+}
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
new file mode 100644
index 0000000000..7cdd5c98d1
--- /dev/null
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
+
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+
+public class DataSourceProcessorProvider {
+    private static final Logger logger = LoggerFactory.getLogger(DataSourceProcessorProvider.class);
+
+    private DataSourceProcessorManager dataSourcePluginManager;
+
+    private DataSourceProcessorProvider() {
+        initDataSourceProcessorPlugin();
+    }
+
+    private static class DataSourceClientProviderHolder {
+        private static final DataSourceProcessorProvider INSTANCE = new DataSourceProcessorProvider();
+    }
+
+    public static DataSourceProcessorProvider getInstance() {
+        return DataSourceClientProviderHolder.INSTANCE;
+    }
+
+    public Map<String, DataSourceProcessor> getDataSourceProcessorMap() {
+        return dataSourcePluginManager.getDataSourceProcessorMap();
+    }
+
+    private void initDataSourceProcessorPlugin() {
+        dataSourcePluginManager = new DataSourceProcessorManager();
+        dataSourcePluginManager.installProcessor();
+    }
+}
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java
index edcd1a332d..b4a85e4674 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java
@@ -17,44 +17,26 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.utils;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
-import org.apache.dolphinscheduler.plugin.datasource.api.datasource.clickhouse.ClickHouseDataSourceProcessor;
-import org.apache.dolphinscheduler.plugin.datasource.api.datasource.db2.Db2DataSourceProcessor;
-import org.apache.dolphinscheduler.plugin.datasource.api.datasource.hive.HiveDataSourceProcessor;
-import org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql.MySQLDataSourceProcessor;
-import org.apache.dolphinscheduler.plugin.datasource.api.datasource.oracle.OracleDataSourceProcessor;
-import org.apache.dolphinscheduler.plugin.datasource.api.datasource.postgresql.PostgreSQLDataSourceProcessor;
-import org.apache.dolphinscheduler.plugin.datasource.api.datasource.presto.PrestoDataSourceProcessor;
-import org.apache.dolphinscheduler.plugin.datasource.api.datasource.redshift.RedshiftDataSourceProcessor;
-import org.apache.dolphinscheduler.plugin.datasource.api.datasource.spark.SparkDataSourceProcessor;
-import org.apache.dolphinscheduler.plugin.datasource.api.datasource.sqlserver.SQLServerDataSourceProcessor;
+import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
 import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
-
-import java.sql.Connection;
-
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.Connection;
+import java.util.Map;
+
 public class DataSourceUtils {
 
-    private DataSourceUtils() {
+    public DataSourceUtils() {
     }
 
     private static final Logger logger = LoggerFactory.getLogger(DataSourceUtils.class);
 
-    private static final DataSourceProcessor mysqlProcessor = new MySQLDataSourceProcessor();
-    private static final DataSourceProcessor postgreSqlProcessor = new PostgreSQLDataSourceProcessor();
-    private static final DataSourceProcessor hiveProcessor = new HiveDataSourceProcessor();
-    private static final DataSourceProcessor sparkProcessor = new SparkDataSourceProcessor();
-    private static final DataSourceProcessor clickhouseProcessor = new ClickHouseDataSourceProcessor();
-    private static final DataSourceProcessor oracleProcessor = new OracleDataSourceProcessor();
-    private static final DataSourceProcessor sqlServerProcessor = new SQLServerDataSourceProcessor();
-    private static final DataSourceProcessor db2PROCESSOR = new Db2DataSourceProcessor();
-    private static final DataSourceProcessor prestoPROCESSOR = new PrestoDataSourceProcessor();
-    private static final DataSourceProcessor redshiftProcessor = new RedshiftDataSourceProcessor();
-
     /**
      * check datasource param
      *
@@ -103,30 +85,11 @@ public class DataSourceUtils {
     }
 
     public static DataSourceProcessor getDatasourceProcessor(DbType dbType) {
-        switch (dbType) {
-            case MYSQL:
-                return mysqlProcessor;
-            case POSTGRESQL:
-                return postgreSqlProcessor;
-            case HIVE:
-                return hiveProcessor;
-            case SPARK:
-                return sparkProcessor;
-            case CLICKHOUSE:
-                return clickhouseProcessor;
-            case ORACLE:
-                return oracleProcessor;
-            case SQLSERVER:
-                return sqlServerProcessor;
-            case DB2:
-                return db2PROCESSOR;
-            case PRESTO:
-                return prestoPROCESSOR;
-            case REDSHIFT:
-                return redshiftProcessor;
-            default:
-                throw new IllegalArgumentException("datasource type illegal:" + dbType);
+        Map<String, DataSourceProcessor> dataSourceProcessorMap = DataSourceProcessorProvider.getInstance().getDataSourceProcessorMap();
+        if (!dataSourceProcessorMap.containsKey(dbType.name())) {
+            throw new IllegalArgumentException("illegal datasource type");
         }
+        return dataSourceProcessorMap.get(dbType.name());
     }
 
     /**
@@ -135,4 +98,14 @@ public class DataSourceUtils {
     public static String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) {
         return getDatasourceProcessor(dbType).getDatasourceUniqueId(connectionParam, dbType);
     }
+
+    /**
+     * build connection url
+     */
+    public static BaseDataSourceParamDTO buildDatasourceParam(String param) {
+        JsonNode jsonNodes = JSONUtils.parseObject(param);
+
+        return getDatasourceProcessor(DbType.ofName(jsonNodes.get("type").asText().toUpperCase()))
+                .castDatasourceParamDTO(param);
+    }
 }
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
index 099ec7f767..869111621a 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
@@ -17,13 +17,14 @@
 
 package org.apache.dolphinscheduler.spi.enums;
 
-import static java.util.stream.Collectors.toMap;
+import com.baomidou.mybatisplus.annotation.EnumValue;
+import com.google.common.base.Functions;
 
 import java.util.Arrays;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
-import com.baomidou.mybatisplus.annotation.EnumValue;
-import com.google.common.base.Functions;
+import static java.util.stream.Collectors.toMap;
 
 public enum DbType {
     MYSQL(0, "mysql"),
@@ -66,6 +67,10 @@ public enum DbType {
         return null;
     }
 
+    public static DbType ofName(String name) {
+        return Arrays.stream(DbType.values()).filter(e -> e.name().equals(name)).findFirst().orElseThrow(() -> new NoSuchElementException("no such db type"));
+    }
+
     public boolean isHive() {
         return this == DbType.HIVE;
     }