You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/06/14 02:47:57 UTC

[incubator-linkis] branch dev-1.2.0 updated: [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links. (#2260)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
     new ae61ee927 [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links. (#2260)
ae61ee927 is described below

commit ae61ee9272ef40362542b0d8b3f350db6b521092
Author: weixiao <le...@gmail.com>
AuthorDate: Tue Jun 14 10:47:51 2022 +0800

    [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links. (#2260)
    
    * [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links.
    
    * [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links and optimize code.
    
    * [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links and optimize linkis_dml.sql.
    
    * [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links and optimize code format.
    
    * [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links and add cn and en data source form label or description.
    
    * [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links and add cn and en data source form label or description.
---
 db/linkis_dml.sql                                  |  14 +-
 .../engineconn-plugins/jdbc/pom.xml                |  20 +++
 .../jdbc/JDBCDataSourceConfigurations.java         |  13 +-
 .../engineplugin/jdbc/PropertiesParser.java        |   6 +
 .../jdbc/constant/JDBCEngineConnConstant.java      |  15 ++
 .../jdbc/executer/JDBCEngineConnExecutor.scala     |  26 ++-
 .../jdbc/executer/JDBCMultiDatasourceParser.scala  | 175 +++++++++++++++++++++
 .../executer/JDBCMultiDatasourceParserTest.scala   | 116 ++++++++++++++
 8 files changed, 372 insertions(+), 13 deletions(-)

diff --git a/db/linkis_dml.sql b/db/linkis_dml.sql
index 2b0942913..a268bc5a0 100644
--- a/db/linkis_dml.sql
+++ b/db/linkis_dml.sql
@@ -456,12 +456,14 @@ INSERT INTO `linkis_ps_dm_datasource_type` (`name`, `description`, `option`, `cl
 INSERT INTO `linkis_ps_dm_datasource_type` (`name`, `description`, `option`, `classifier`, `icon`, `layers`) VALUES ('mongodb', 'default', 'default', 'DEFAULT', NULL, 3);
 
 
-INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'host', 'Host', NULL, 'TEXT', NULL, 1, 'mysql Host ', NULL, NULL, NULL, NULL,  now(), now());
-INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'port', '端口', NULL, 'TEXT', NULL, 1, '端口', NULL, NULL, NULL, NULL,  now(), now());
-INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'params', '连接参数', NULL, 'TEXT', NULL, 0, '输入JSON格式: {"param":"value"}', NULL, NULL, NULL, NULL,  now(), now());
-INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'username', '用户名', NULL, 'TEXT', NULL, 1, '用户名', '^[0-9A-Za-z_-]+$', NULL, NULL, NULL,  now(), now());
-INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'password', '密码', NULL, 'PASSWORD', NULL, 1, '密码', '', NULL, NULL, NULL,  now(), now());
-INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (4, 'envId', '集群环境', NULL, 'SELECT', NULL, 1, '集群环境', NULL, NULL, NULL, '/data-source-manager/env-list/all/type/4', now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'host', '主机名(Host)', NULL, 'TEXT', NULL, 1, '主机名(Host)', NULL, NULL, NULL, NULL,  now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'port', '端口号(Port)', NULL, 'TEXT', NULL, 1, '端口号(Port)', NULL, NULL, NULL, NULL,  now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'driverClassName', '驱动类名(Driver class name)', 'com.mysql.jdbc.Driver', 'TEXT', NULL, 1, '驱动类名(Driver class name)', NULL, NULL, NULL, NULL,  now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'params', '连接参数(Connection params)', NULL, 'TEXT', NULL, 0, '输入JSON格式(Input JSON format): {"param":"value"}', NULL, NULL, NULL, NULL,  now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'username', '用户名(Username)', NULL, 'TEXT', NULL, 1, '用户名(Username)', '^[0-9A-Za-z_-]+$', NULL, NULL, NULL,  now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'password', '密码(Password)', NULL, 'PASSWORD', NULL, 1, '密码(Password)', '', NULL, NULL, NULL,  now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'databaseName', '数据库名(Database name)', NULL, 'TEXT', NULL, 0, '数据库名(Database name)', NULL, NULL, NULL, NULL,  now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (4, 'envId', '集群环境(Cluster env)', NULL, 'SELECT', NULL, 1, '集群环境(Cluster env)', NULL, NULL, NULL, '/data-source-manager/env-list/all/type/4', now(), now());
 
 
 INSERT INTO `linkis_ps_dm_datasource_env` (`env_name`, `env_desc`, `datasource_type_id`, `parameter`, `create_time`, `create_user`, `modify_time`, `modify_user`) VALUES ('测试环境SIT', '测试环境SIT', 4, '{"uris":"thrift://localhost:9083", "hadoopConf":{"hive.metastore.execute.setugi":"true"}}',  now(), NULL,  now(), NULL);
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml b/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml
index 5a28641d0..461a3f56e 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml
@@ -178,6 +178,26 @@
             <version>${druid.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.linkis</groupId>
+            <artifactId>linkis-datasource-client</artifactId>
+            <version>${linkis.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>javax.servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
     </dependencies>
     <build>
         <plugins>
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java
index 8b922f2dc..c13939b1f 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java
@@ -17,12 +17,16 @@
 
 package org.apache.linkis.manager.engineplugin.jdbc;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class JDBCDataSourceConfigurations {
+    private static final Logger LOG = LoggerFactory.getLogger(JDBCDataSourceConfigurations.class);
     private final Map<String, Statement> taskIdStatementMap;
 
     public JDBCDataSourceConfigurations() {
@@ -36,6 +40,7 @@ public class JDBCDataSourceConfigurations {
             }
         }
         taskIdStatementMap.clear();
+        LOG.info("The jdbc task statement map has be cleared successfully!");
     }
 
     public void saveStatement(String taskId, Statement statement) {
@@ -43,10 +48,16 @@ public class JDBCDataSourceConfigurations {
     }
 
     public void cancelStatement(String taskId) throws SQLException {
-        taskIdStatementMap.get(taskId).cancel();
+        LOG.info("Starting to cancel the statement of task {} ...", taskId);
+        Statement statement = taskIdStatementMap.get(taskId);
+        if (statement != null) {
+            statement.cancel();
+        }
+        LOG.info("Finished cancel the statement of task {}.", taskId);
     }
 
     public void removeStatement(String taskId) {
         taskIdStatementMap.remove(taskId);
+        LOG.info("Finished remove the statement of task {}", taskId);
     }
 }
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java
index 5bc15dcab..2ab943e56 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java
@@ -23,6 +23,12 @@ import java.util.Map;
 
 public abstract class PropertiesParser {
     interface TypeConversion<T> {
+        /**
+         * String type data is converted to T type
+         *
+         * @param oriV origin type
+         * @return T which is target type
+         */
         T convertTo(String oriV);
     }
 
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
index 0ccc4b49e..305643f67 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
@@ -36,6 +36,8 @@ public class JDBCEngineConnConstant {
     public static final String JDBC_PROXY_USER = "wds.linkis.jdbc.proxy.user";
     public static final String JDBC_SCRIPTS_EXEC_USER = "execUser";
     public static final String JDBC_ENGINE_RUN_TIME_DS = "wds.linkis.engine.runtime.datasource";
+    public static final String JDBC_ENGINE_RUN_TIME_DS_SYSTEM_QUERY_PARAM =
+            "wds.linkis.engine.runtime.datasource.systemQueryParam";
 
     public static final String JDBC_POOL_TEST_ON_BORROW = "wds.linkis.jdbc.pool.testOnBorrow";
     public static final String JDBC_POOL_TEST_ON_RETURN = "wds.linkis.jdbc.pool.testOnReturn";
@@ -57,5 +59,18 @@ public class JDBCEngineConnConstant {
     public static final String JDBC_POOL_REMOVE_ABANDONED_TIMEOUT =
             "wds.linkis.jdbc.pool.remove.abandoned.timeout";
 
+    public static final String DS_JDBC_HOST = "host";
+    public static final String DS_JDBC_PORT = "port";
+    public static final String DS_JDBC_DB_NAME = "databaseName";
+    public static final String DS_JDBC_USERNAME = "username";
+    public static final String DS_JDBC_PASSWORD = "password";
+    public static final String DS_JDBC_ENABLE_KERBEROS = "enableKerberos";
+    public static final String DS_JDBC_KERBEROS_PRINCIPAL = "kerberosPrincipal";
+    public static final String DS_JDBC_KERBEROS_KEYTAB = "kerberosKeytab";
+    public static final String DS_JDBC_ENABLE_KERBEROS_PROXY_USER = "enableKerberosProxyUser";
+    public static final String DS_JDBC_KERBEROS_PROXY_USER_PROPERTY = "kerberosProxyUserProperty";
+    public static final String DS_JDBC_PARAMS = "params";
+    public static final String DS_JDBC_DRIVER = "driverClassName";
+
     public static final String JDBC_ENGINE_MEMORY_UNIT = "g";
 }
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
index 3bb05236a..4a84fb154 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
@@ -44,6 +44,7 @@ import org.apache.linkis.governance.common.paser.SQLCodeParser
 import org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant
 
 import scala.collection.JavaConversions._
+
 import scala.collection.mutable.ArrayBuffer
 
 class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) extends ConcurrentComputationExecutor(outputPrintLimit) {
@@ -68,13 +69,21 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
     val taskId = engineExecutorContext.getJobId.get
     val properties = engineExecutorContext.getProperties.asInstanceOf[util.Map[String, String]]
     var dataSourceName = properties.getOrDefault(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS, "")
+    val dataSourceQuerySystemParam = properties.getOrDefault(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS_SYSTEM_QUERY_PARAM, "")
 
     if (properties.get(JDBCEngineConnConstant.JDBC_URL) == null) {
       info(s"The jdbc url is empty, adding now...")
-      val globalConfig = Utils.tryAndWarn(JDBCEngineConfig.getCacheMap(engineExecutorContext.getLabels))
+      val globalConfig: util.Map[String, String] = Utils.tryAndWarn(JDBCEngineConfig.getCacheMap(engineExecutorContext.getLabels))
       if (StringUtils.isNotBlank(dataSourceName)) {
         info("Start getting data source connection parameters from the data source hub.")
-        // todo get data source info by data source client
+        Utils.tryCatch {
+          val dataSourceInfo = JDBCMultiDatasourceParser.queryDatasourceInfoByName(dataSourceName, execSqlUser, dataSourceQuerySystemParam)
+          if (dataSourceInfo != null && !dataSourceInfo.isEmpty) {
+            globalConfig.putAll(dataSourceInfo)
+          }
+        } {
+          e: Throwable => return ErrorExecuteResponse(s"Failed to get datasource info about [$dataSourceName] from datasource server.", e)
+        }
       }
       properties.put(JDBCEngineConnConstant.JDBC_URL, globalConfig.get(JDBCEngineConnConstant.JDBC_URL))
       properties.put(JDBCEngineConnConstant.JDBC_DRIVER, globalConfig.get(JDBCEngineConnConstant.JDBC_DRIVER))
@@ -95,16 +104,19 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
     var connection: Connection = null
     var statement: Statement = null
     var resultSet: ResultSet = null
-
+    info(s"The data source properties is $properties")
     Utils.tryCatch({
       connection = connectionManager.getConnection(dataSourceName, properties)
+      info("The jdbc connection has created successfully!")
     }) {
-      case e: Exception => return ErrorExecuteResponse("created data source connection error.", e)
+      e: Throwable =>
+        error(s"created data source connection error! $e")
+        return ErrorExecuteResponse("created data source connection error!", e)
     }
 
     try {
-      statement.setQueryTimeout(JDBCConfiguration.JDBC_QUERY_TIMEOUT.getValue)
       statement = connection.createStatement()
+      statement.setQueryTimeout(JDBCConfiguration.JDBC_QUERY_TIMEOUT.getValue)
       statement.setFetchSize(outputPrintLimit)
       statement.setMaxRows(outputPrintLimit)
       info(s"create statement is:  $statement")
@@ -131,7 +143,9 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
         }
       }
     } catch {
-      case e: Throwable => error(s"Cannot run $code", e)
+      case e: Throwable =>
+        error(s"Cannot run $code", e)
+        return ErrorExecuteResponse(e.getMessage, e)
     } finally {
       if (connection != null) {
         try {
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala
new file mode 100644
index 000000000..120f1e470
--- /dev/null
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc.executer
+
+import java.util
+import org.apache.commons.lang.StringUtils
+import org.apache.linkis.common.utils.{JsonUtils, Logging, Utils}
+import org.apache.linkis.datasource.client.impl.LinkisDataSourceRemoteClient
+import org.apache.linkis.datasource.client.request.GetInfoByDataSourceNameAction
+import org.apache.linkis.datasourcemanager.common.domain.DataSource
+import org.apache.linkis.manager.engineplugin.jdbc.JdbcAuthType
+import org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant
+import org.apache.linkis.manager.engineplugin.jdbc.exception.JDBCParamsIllegalException
+import scala.collection.JavaConversions._
+
+object JDBCMultiDatasourceParser extends Logging {
+
+  def queryDatasourceInfoByName(datasourceName: String, username: String, system: String): util.Map[String, String] = {
+    info(s"Starting query [$system, $username, $datasourceName] datasource info ......")
+    val dataSourceClient = new LinkisDataSourceRemoteClient()
+    var dataSource: DataSource = null
+    Utils.tryCatch {
+      dataSource = dataSourceClient.getInfoByDataSourceName(GetInfoByDataSourceNameAction.builder()
+        .setSystem(system)
+        .setDataSourceName(datasourceName)
+        .setUser(username)
+        .build()).getDataSource
+    } {
+      case e: Exception => warn(s"Get data source info error, $e")
+    }
+    queryDatasourceInfo(datasourceName, dataSource)
+  }
+
+  def queryDatasourceInfo(datasourceName: String, dataSource: DataSource): util.Map[String, String] = {
+    val dsConnInfo = new util.HashMap[String, String]()
+
+    if (strObjIsBlank(dataSource)) {
+      throw JDBCParamsIllegalException(s"Data source [$datasourceName] info not found!")
+    }
+
+    if (dataSource.getPublishedVersionId == null || dataSource.getPublishedVersionId <= 0) {
+      throw JDBCParamsIllegalException(s"Data source [$datasourceName] not yet published!")
+    }
+
+    if (dataSource.isExpire) {
+      throw JDBCParamsIllegalException(s"Data source [$datasourceName] is expired!")
+    }
+
+    if (dataSource.getDataSourceType == null || StringUtils.isBlank(dataSource.getDataSourceType.getName)) {
+      throw JDBCParamsIllegalException("The data source jdbc type cannot be null!")
+    }
+
+    val dbType = dataSource.getDataSourceType.getName
+    val dbConnParams = dataSource.getConnectParams
+    if (dbConnParams == null || dbConnParams.isEmpty) {
+      throw JDBCParamsIllegalException("The data source jdbc connection info cannot be null!")
+    }
+
+    val driverClassName = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_DRIVER)
+    if (strObjIsBlank(driverClassName)) {
+      throw JDBCParamsIllegalException("The data source jdbc driverClassName cannot be null!")
+    }
+
+    val jdbcUrl = createJdbcUrl(dbType, dbConnParams)
+    info(s"The url parsed from the data source connection information is $jdbcUrl")
+    dsConnInfo.put(JDBCEngineConnConstant.JDBC_URL, jdbcUrl)
+    dsConnInfo.put(JDBCEngineConnConstant.JDBC_DRIVER, driverClassName.toString)
+    appendJdbcAuthType(dbConnParams, dsConnInfo)
+  }
+
+  def createJdbcUrl(dbType: String, dbConnParams: util.Map[String, Object]): String = {
+    val host = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_HOST)
+    if (strObjIsBlank(host)) {
+      throw JDBCParamsIllegalException("The data source jdbc connection host cannot be null!")
+    }
+    val port = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_PORT)
+    if (strObjIsBlank(port)) {
+      throw JDBCParamsIllegalException("The data source jdbc connection port cannot be null!")
+    }
+    var jdbcUrl = s"jdbc:$dbType://$host:$port"
+    val dbName = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_DB_NAME)
+    if (strObjIsNotBlank(dbName)) {
+      jdbcUrl = s"$jdbcUrl/$dbName"
+    }
+
+    val params = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_PARAMS)
+    val paramsMap = if (strObjIsNotBlank(params)) convertJsonStrToMap(params.toString) else new util.HashMap[String, Object]()
+
+    if (!paramsMap.isEmpty) {
+      val headConf = paramsMap.head
+      jdbcUrl = s"$jdbcUrl?${headConf._1}=${headConf._2}"
+      paramsMap.remove(headConf._1)
+    }
+
+    if (!paramsMap.isEmpty) {
+      val paramsJoin = for ((k, v) <- paramsMap) yield s"$k=${v.toString}".toList.mkString("&")
+      jdbcUrl = s"$jdbcUrl&$paramsJoin"
+    }
+
+    jdbcUrl
+  }
+
+  def appendJdbcAuthType(dbConnParams: util.Map[String, Object], dsConnInfo: util.HashMap[String, String]): util.HashMap[String, String] = {
+    val username = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_USERNAME)
+    val password = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_PASSWORD)
+    val enableKerberos = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_ENABLE_KERBEROS)
+    val kerberosPrincipal = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_KERBEROS_PRINCIPAL)
+    val kerberosKeytab = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_KERBEROS_KEYTAB)
+    var authType: JdbcAuthType = JdbcAuthType.SIMPLE
+    if (strObjIsNotBlank(username) && strObjIsNotBlank(password)) {
+      authType = JdbcAuthType.USERNAME
+    } else {
+      if (strObjIsNotBlank(enableKerberos) && enableKerberos.toString.toBoolean) {
+        authType = JdbcAuthType.KERBEROS
+        if (strObjIsBlank(kerberosPrincipal)) {
+          throw JDBCParamsIllegalException("In the jdbc authentication mode of kerberos, the kerberos principal cannot be empty!")
+        }
+        if (strObjIsBlank(kerberosKeytab)) {
+          throw JDBCParamsIllegalException("In the jdbc authentication mode of kerberos, the kerberos keytab cannot be empty!")
+        }
+      } else {
+        authType = JdbcAuthType.SIMPLE
+      }
+    }
+
+    authType match {
+      case JdbcAuthType.SIMPLE =>
+        info("jdbc simple auth type.")
+      case JdbcAuthType.USERNAME =>
+        dsConnInfo.put(JDBCEngineConnConstant.JDBC_USERNAME, username.toString)
+        dsConnInfo.put(JDBCEngineConnConstant.JDBC_PASSWORD, password.toString)
+      case JdbcAuthType.KERBEROS =>
+        dsConnInfo.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL, kerberosPrincipal.toString)
+        dsConnInfo.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION, kerberosKeytab.toString)
+        val enableKerberosProxyUser = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_ENABLE_KERBEROS_PROXY_USER)
+        if (strObjIsNotBlank(enableKerberosProxyUser)) {
+          dsConnInfo.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_PROXY_ENABLE, enableKerberosProxyUser.toString)
+        }
+        val kerberosProxyUserProperty = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_KERBEROS_PROXY_USER_PROPERTY)
+        if (strObjIsNotBlank(kerberosProxyUserProperty)) {
+          dsConnInfo.put(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY, kerberosProxyUserProperty.toString)
+        }
+      case _ => throw JDBCParamsIllegalException(s"Unsupported authentication type ${authType.getAuthType}")
+    }
+    dsConnInfo.put(JDBCEngineConnConstant.JDBC_AUTH_TYPE, authType.getAuthType)
+    dsConnInfo
+  }
+
+  private def convertJsonStrToMap(jsonStr: String): util.Map[String, Object] = {
+    JsonUtils.jackson.readValue(jsonStr, classOf[util.Map[String, Object]])
+  }
+
+  private def strObjIsNotBlank(str: Object): Boolean = {
+    str != null && StringUtils.isNotBlank(str.toString)
+  }
+
+  private def strObjIsBlank(str: Object): Boolean = {
+    ! strObjIsNotBlank(str)
+  }
+}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala
new file mode 100644
index 000000000..a425ea867
--- /dev/null
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc.executer
+
+import org.apache.linkis.datasourcemanager.common.domain.{DataSource, DataSourceType}
+import org.apache.linkis.manager.engineplugin.jdbc.JdbcAuthType
+
+import java.util
+import org.junit.jupiter.api.{BeforeEach, DisplayName, Test}
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant
+
+
+class JDBCMultiDatasourceParserTest {
+
+  val dbType = "mysql"
+  val dbConnParams: util.Map[String, Object] = new util.HashMap[String, Object]()
+  val datasource: DataSource = new DataSource()
+
+  @BeforeEach
+  def initDatasourceConnParams(): Unit = {
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_HOST, "localhost")
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_PORT, "3306")
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_USERNAME, "username")
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_PASSWORD, "password")
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_DB_NAME, "dbName")
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_DRIVER, "com.mysql.jdbc.Driver")
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_PARAMS, "{\"useSSL\": \"false\"}")
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_ENABLE_KERBEROS, "false")
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_KERBEROS_PRINCIPAL, "hadoop@com")
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_KERBEROS_KEYTAB, "/data/linkis/keytab/hadoop.keytab")
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_ENABLE_KERBEROS_PROXY_USER, "true")
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_KERBEROS_PROXY_USER_PROPERTY, "hive.server2.proxy.user")
+
+    val dataSourceType = new DataSourceType()
+    dataSourceType.setName(dbType)
+    datasource.setDataSourceType(dataSourceType)
+    datasource.setConnectParams(dbConnParams)
+    datasource.setPublishedVersionId(1L)
+    datasource.setExpire(false)
+  }
+
+  @Test
+  @DisplayName("testCreateJdbcUrl")
+  def testCreateJdbcUrl(): Unit = {
+    val url1 = JDBCMultiDatasourceParser.createJdbcUrl(dbType, dbConnParams)
+    assertTrue(url1 != null && "jdbc:mysql://localhost:3306/dbName?useSSL=false".equals(url1))
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_DB_NAME, "")
+    val url2 = JDBCMultiDatasourceParser.createJdbcUrl(dbType, dbConnParams)
+    assertTrue(url2 != null && "jdbc:mysql://localhost:3306?useSSL=false".equals(url2))
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_HOST, "")
+    try {
+      JDBCMultiDatasourceParser.createJdbcUrl(dbType, dbConnParams)
+    } catch {
+      case e: Throwable => assertTrue(true)
+    }
+  }
+
+  @Test
+  @DisplayName("testAppendJdbcAuthType")
+  def testAppendJdbcAuthType(): Unit = {
+    var dsConnInfo = new util.HashMap[String, String]()
+    dsConnInfo = JDBCMultiDatasourceParser.appendJdbcAuthType(dbConnParams, dsConnInfo)
+    assertTrue(dsConnInfo.get(JDBCEngineConnConstant.JDBC_AUTH_TYPE).equals(JdbcAuthType.USERNAME.getAuthType))
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_USERNAME, "")
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_PASSWORD, "")
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_ENABLE_KERBEROS, "true")
+    dsConnInfo = new util.HashMap[String, String]()
+    dsConnInfo = JDBCMultiDatasourceParser.appendJdbcAuthType(dbConnParams, dsConnInfo)
+    assertTrue(dsConnInfo.get(JDBCEngineConnConstant.JDBC_AUTH_TYPE).equals(JdbcAuthType.KERBEROS.getAuthType))
+    dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_ENABLE_KERBEROS_PROXY_USER, "false")
+    dsConnInfo = new util.HashMap[String, String]()
+    dsConnInfo = JDBCMultiDatasourceParser.appendJdbcAuthType(dbConnParams, dsConnInfo)
+    assertTrue(dsConnInfo.get(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_PROXY_ENABLE).equals("false"))
+  }
+
+  @Test
+  @DisplayName("testQueryDatasourceInfo")
+  def testQueryDatasourceInfo(): Unit = {
+    val dataSourceInfo = JDBCMultiDatasourceParser.queryDatasourceInfo("test_mysql", datasource)
+    val jdbcUrl = dataSourceInfo.get(JDBCEngineConnConstant.JDBC_URL)
+    assertTrue(jdbcUrl != null && "jdbc:mysql://localhost:3306/dbName?useSSL=false".equals(jdbcUrl))
+    assertTrue(dataSourceInfo.get(JDBCEngineConnConstant.JDBC_AUTH_TYPE).equals(JdbcAuthType.USERNAME.getAuthType))
+  }
+
+  @Test
+  @DisplayName("testMapPutAll")
+  def testMapPutAll(): Unit = {
+    val globalConfig: util.Map[String, String] = new util.HashMap[String, String]()
+    globalConfig.put(JDBCEngineConnConstant.JDBC_URL, "jdbc:mysql://localhost:3306/dbName?useSSL=false")
+    globalConfig.put(JDBCEngineConnConstant.JDBC_AUTH_TYPE, JdbcAuthType.SIMPLE.getAuthType)
+    val dataSourceInfo: util.Map[String, String] = new util.HashMap[String, String]()
+    dataSourceInfo.put(JDBCEngineConnConstant.JDBC_URL, "jdbc:mysql://127.0.0.1:3306/dbName?useSSL=false")
+    dataSourceInfo.put(JDBCEngineConnConstant.JDBC_AUTH_TYPE, JdbcAuthType.USERNAME.getAuthType)
+    dataSourceInfo.put(JDBCEngineConnConstant.JDBC_USERNAME, "user")
+    dataSourceInfo.put(JDBCEngineConnConstant.JDBC_PASSWORD, "password")
+    globalConfig.putAll(dataSourceInfo)
+    assertTrue(globalConfig.size() == 4)
+    assertTrue(globalConfig.get(JDBCEngineConnConstant.JDBC_AUTH_TYPE).equals(JdbcAuthType.USERNAME.getAuthType))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org