You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/06/14 02:47:57 UTC
[incubator-linkis] branch dev-1.2.0 updated: [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links. (#2260)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
new ae61ee927 [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links. (#2260)
ae61ee927 is described below
commit ae61ee9272ef40362542b0d8b3f350db6b521092
Author: weixiao <le...@gmail.com>
AuthorDate: Tue Jun 14 10:47:51 2022 +0800
[Feature] [1.2.0] Linkis jdbc engine supports multiple data source links. (#2260)
* [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links.
* [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links and optimize code.
* [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links and optimize linkis_dml.sql.
* [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links and optimize code format.
* [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links and add cn and en data source form label or description.
* [Feature] [1.2.0] Linkis jdbc engine supports multiple data source links and add cn and en data source form label or description.
---
db/linkis_dml.sql | 14 +-
.../engineconn-plugins/jdbc/pom.xml | 20 +++
.../jdbc/JDBCDataSourceConfigurations.java | 13 +-
.../engineplugin/jdbc/PropertiesParser.java | 6 +
.../jdbc/constant/JDBCEngineConnConstant.java | 15 ++
.../jdbc/executer/JDBCEngineConnExecutor.scala | 26 ++-
.../jdbc/executer/JDBCMultiDatasourceParser.scala | 175 +++++++++++++++++++++
.../executer/JDBCMultiDatasourceParserTest.scala | 116 ++++++++++++++
8 files changed, 372 insertions(+), 13 deletions(-)
diff --git a/db/linkis_dml.sql b/db/linkis_dml.sql
index 2b0942913..a268bc5a0 100644
--- a/db/linkis_dml.sql
+++ b/db/linkis_dml.sql
@@ -456,12 +456,14 @@ INSERT INTO `linkis_ps_dm_datasource_type` (`name`, `description`, `option`, `cl
INSERT INTO `linkis_ps_dm_datasource_type` (`name`, `description`, `option`, `classifier`, `icon`, `layers`) VALUES ('mongodb', 'default', 'default', 'DEFAULT', NULL, 3);
-INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'host', 'Host', NULL, 'TEXT', NULL, 1, 'mysql Host ', NULL, NULL, NULL, NULL, now(), now());
-INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'port', '端口', NULL, 'TEXT', NULL, 1, '端口', NULL, NULL, NULL, NULL, now(), now());
-INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'params', '连接参数', NULL, 'TEXT', NULL, 0, '输入JSON格式: {"param":"value"}', NULL, NULL, NULL, NULL, now(), now());
-INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'username', '用户名', NULL, 'TEXT', NULL, 1, '用户名', '^[0-9A-Za-z_-]+$', NULL, NULL, NULL, now(), now());
-INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'password', '密码', NULL, 'PASSWORD', NULL, 1, '密码', '', NULL, NULL, NULL, now(), now());
-INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (4, 'envId', '集群环境', NULL, 'SELECT', NULL, 1, '集群环境', NULL, NULL, NULL, '/data-source-manager/env-list/all/type/4', now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'host', '主机名(Host)', NULL, 'TEXT', NULL, 1, '主机名(Host)', NULL, NULL, NULL, NULL, now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'port', '端口号(Port)', NULL, 'TEXT', NULL, 1, '端口号(Port)', NULL, NULL, NULL, NULL, now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'driverClassName', '驱动类名(Driver class name)', 'com.mysql.jdbc.Driver', 'TEXT', NULL, 1, '驱动类名(Driver class name)', NULL, NULL, NULL, NULL, now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'params', '连接参数(Connection params)', NULL, 'TEXT', NULL, 0, '输入JSON格式(Input JSON format): {"param":"value"}', NULL, NULL, NULL, NULL, now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'username', '用户名(Username)', NULL, 'TEXT', NULL, 1, '用户名(Username)', '^[0-9A-Za-z_-]+$', NULL, NULL, NULL, now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'password', '密码(Password)', NULL, 'PASSWORD', NULL, 1, '密码(Password)', '', NULL, NULL, NULL, now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (1, 'databaseName', '数据库名(Database name)', NULL, 'TEXT', NULL, 0, '数据库名(Database name)', NULL, NULL, NULL, NULL, now(), now());
+INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `default_value`, `value_type`, `scope`, `require`, `description`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (4, 'envId', '集群环境(Cluster env)', NULL, 'SELECT', NULL, 1, '集群环境(Cluster env)', NULL, NULL, NULL, '/data-source-manager/env-list/all/type/4', now(), now());
INSERT INTO `linkis_ps_dm_datasource_env` (`env_name`, `env_desc`, `datasource_type_id`, `parameter`, `create_time`, `create_user`, `modify_time`, `modify_user`) VALUES ('测试环境SIT', '测试环境SIT', 4, '{"uris":"thrift://localhost:9083", "hadoopConf":{"hive.metastore.execute.setugi":"true"}}', now(), NULL, now(), NULL);
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml b/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml
index 5a28641d0..461a3f56e 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/pom.xml
@@ -178,6 +178,26 @@
<version>${druid.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-datasource-client</artifactId>
+ <version>${linkis.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
</dependencies>
<build>
<plugins>
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java
index 8b922f2dc..c13939b1f 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/JDBCDataSourceConfigurations.java
@@ -17,12 +17,16 @@
package org.apache.linkis.manager.engineplugin.jdbc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class JDBCDataSourceConfigurations {
+ private static final Logger LOG = LoggerFactory.getLogger(JDBCDataSourceConfigurations.class);
private final Map<String, Statement> taskIdStatementMap;
public JDBCDataSourceConfigurations() {
@@ -36,6 +40,7 @@ public class JDBCDataSourceConfigurations {
}
}
taskIdStatementMap.clear();
+ LOG.info("The jdbc task statement map has be cleared successfully!");
}
public void saveStatement(String taskId, Statement statement) {
@@ -43,10 +48,16 @@ public class JDBCDataSourceConfigurations {
}
public void cancelStatement(String taskId) throws SQLException {
- taskIdStatementMap.get(taskId).cancel();
+ LOG.info("Starting to cancel the statement of task {} ...", taskId);
+ Statement statement = taskIdStatementMap.get(taskId);
+ if (statement != null) {
+ statement.cancel();
+ }
+ LOG.info("Finished cancel the statement of task {}.", taskId);
}
public void removeStatement(String taskId) {
taskIdStatementMap.remove(taskId);
+ LOG.info("Finished remove the statement of task {}", taskId);
}
}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java
index 5bc15dcab..2ab943e56 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/PropertiesParser.java
@@ -23,6 +23,12 @@ import java.util.Map;
public abstract class PropertiesParser {
interface TypeConversion<T> {
+ /**
+ * String type data is converted to T type
+ *
+ * @param oriV origin type
+ * @return T which is target type
+ */
T convertTo(String oriV);
}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
index 0ccc4b49e..305643f67 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
@@ -36,6 +36,8 @@ public class JDBCEngineConnConstant {
public static final String JDBC_PROXY_USER = "wds.linkis.jdbc.proxy.user";
public static final String JDBC_SCRIPTS_EXEC_USER = "execUser";
public static final String JDBC_ENGINE_RUN_TIME_DS = "wds.linkis.engine.runtime.datasource";
+ public static final String JDBC_ENGINE_RUN_TIME_DS_SYSTEM_QUERY_PARAM =
+ "wds.linkis.engine.runtime.datasource.systemQueryParam";
public static final String JDBC_POOL_TEST_ON_BORROW = "wds.linkis.jdbc.pool.testOnBorrow";
public static final String JDBC_POOL_TEST_ON_RETURN = "wds.linkis.jdbc.pool.testOnReturn";
@@ -57,5 +59,18 @@ public class JDBCEngineConnConstant {
public static final String JDBC_POOL_REMOVE_ABANDONED_TIMEOUT =
"wds.linkis.jdbc.pool.remove.abandoned.timeout";
+ public static final String DS_JDBC_HOST = "host";
+ public static final String DS_JDBC_PORT = "port";
+ public static final String DS_JDBC_DB_NAME = "databaseName";
+ public static final String DS_JDBC_USERNAME = "username";
+ public static final String DS_JDBC_PASSWORD = "password";
+ public static final String DS_JDBC_ENABLE_KERBEROS = "enableKerberos";
+ public static final String DS_JDBC_KERBEROS_PRINCIPAL = "kerberosPrincipal";
+ public static final String DS_JDBC_KERBEROS_KEYTAB = "kerberosKeytab";
+ public static final String DS_JDBC_ENABLE_KERBEROS_PROXY_USER = "enableKerberosProxyUser";
+ public static final String DS_JDBC_KERBEROS_PROXY_USER_PROPERTY = "kerberosProxyUserProperty";
+ public static final String DS_JDBC_PARAMS = "params";
+ public static final String DS_JDBC_DRIVER = "driverClassName";
+
public static final String JDBC_ENGINE_MEMORY_UNIT = "g";
}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
index 3bb05236a..4a84fb154 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
@@ -44,6 +44,7 @@ import org.apache.linkis.governance.common.paser.SQLCodeParser
import org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant
import scala.collection.JavaConversions._
+
import scala.collection.mutable.ArrayBuffer
class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) extends ConcurrentComputationExecutor(outputPrintLimit) {
@@ -68,13 +69,21 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
val taskId = engineExecutorContext.getJobId.get
val properties = engineExecutorContext.getProperties.asInstanceOf[util.Map[String, String]]
var dataSourceName = properties.getOrDefault(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS, "")
+ val dataSourceQuerySystemParam = properties.getOrDefault(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS_SYSTEM_QUERY_PARAM, "")
if (properties.get(JDBCEngineConnConstant.JDBC_URL) == null) {
info(s"The jdbc url is empty, adding now...")
- val globalConfig = Utils.tryAndWarn(JDBCEngineConfig.getCacheMap(engineExecutorContext.getLabels))
+ val globalConfig: util.Map[String, String] = Utils.tryAndWarn(JDBCEngineConfig.getCacheMap(engineExecutorContext.getLabels))
if (StringUtils.isNotBlank(dataSourceName)) {
info("Start getting data source connection parameters from the data source hub.")
- // todo get data source info by data source client
+ Utils.tryCatch {
+ val dataSourceInfo = JDBCMultiDatasourceParser.queryDatasourceInfoByName(dataSourceName, execSqlUser, dataSourceQuerySystemParam)
+ if (dataSourceInfo != null && !dataSourceInfo.isEmpty) {
+ globalConfig.putAll(dataSourceInfo)
+ }
+ } {
+ e: Throwable => return ErrorExecuteResponse(s"Failed to get datasource info about [$dataSourceName] from datasource server.", e)
+ }
}
properties.put(JDBCEngineConnConstant.JDBC_URL, globalConfig.get(JDBCEngineConnConstant.JDBC_URL))
properties.put(JDBCEngineConnConstant.JDBC_DRIVER, globalConfig.get(JDBCEngineConnConstant.JDBC_DRIVER))
@@ -95,16 +104,19 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
var connection: Connection = null
var statement: Statement = null
var resultSet: ResultSet = null
-
+ info(s"The data source properties is $properties")
Utils.tryCatch({
connection = connectionManager.getConnection(dataSourceName, properties)
+ info("The jdbc connection has created successfully!")
}) {
- case e: Exception => return ErrorExecuteResponse("created data source connection error.", e)
+ e: Throwable =>
+ error(s"created data source connection error! $e")
+ return ErrorExecuteResponse("created data source connection error!", e)
}
try {
- statement.setQueryTimeout(JDBCConfiguration.JDBC_QUERY_TIMEOUT.getValue)
statement = connection.createStatement()
+ statement.setQueryTimeout(JDBCConfiguration.JDBC_QUERY_TIMEOUT.getValue)
statement.setFetchSize(outputPrintLimit)
statement.setMaxRows(outputPrintLimit)
info(s"create statement is: $statement")
@@ -131,7 +143,9 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
}
}
} catch {
- case e: Throwable => error(s"Cannot run $code", e)
+ case e: Throwable =>
+ error(s"Cannot run $code", e)
+ return ErrorExecuteResponse(e.getMessage, e)
} finally {
if (connection != null) {
try {
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala
new file mode 100644
index 000000000..120f1e470
--- /dev/null
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc.executer
+
+import java.util
+import org.apache.commons.lang.StringUtils
+import org.apache.linkis.common.utils.{JsonUtils, Logging, Utils}
+import org.apache.linkis.datasource.client.impl.LinkisDataSourceRemoteClient
+import org.apache.linkis.datasource.client.request.GetInfoByDataSourceNameAction
+import org.apache.linkis.datasourcemanager.common.domain.DataSource
+import org.apache.linkis.manager.engineplugin.jdbc.JdbcAuthType
+import org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant
+import org.apache.linkis.manager.engineplugin.jdbc.exception.JDBCParamsIllegalException
+import scala.collection.JavaConversions._
+
+object JDBCMultiDatasourceParser extends Logging {
+
+ def queryDatasourceInfoByName(datasourceName: String, username: String, system: String): util.Map[String, String] = {
+ info(s"Starting query [$system, $username, $datasourceName] datasource info ......")
+ val dataSourceClient = new LinkisDataSourceRemoteClient()
+ var dataSource: DataSource = null
+ Utils.tryCatch {
+ dataSource = dataSourceClient.getInfoByDataSourceName(GetInfoByDataSourceNameAction.builder()
+ .setSystem(system)
+ .setDataSourceName(datasourceName)
+ .setUser(username)
+ .build()).getDataSource
+ } {
+ case e: Exception => warn(s"Get data source info error, $e")
+ }
+ queryDatasourceInfo(datasourceName, dataSource)
+ }
+
+ def queryDatasourceInfo(datasourceName: String, dataSource: DataSource): util.Map[String, String] = {
+ val dsConnInfo = new util.HashMap[String, String]()
+
+ if (strObjIsBlank(dataSource)) {
+ throw JDBCParamsIllegalException(s"Data source [$datasourceName] info not found!")
+ }
+
+ if (dataSource.getPublishedVersionId == null || dataSource.getPublishedVersionId <= 0) {
+ throw JDBCParamsIllegalException(s"Data source [$datasourceName] not yet published!")
+ }
+
+ if (dataSource.isExpire) {
+ throw JDBCParamsIllegalException(s"Data source [$datasourceName] is expired!")
+ }
+
+ if (dataSource.getDataSourceType == null || StringUtils.isBlank(dataSource.getDataSourceType.getName)) {
+ throw JDBCParamsIllegalException("The data source jdbc type cannot be null!")
+ }
+
+ val dbType = dataSource.getDataSourceType.getName
+ val dbConnParams = dataSource.getConnectParams
+ if (dbConnParams == null || dbConnParams.isEmpty) {
+ throw JDBCParamsIllegalException("The data source jdbc connection info cannot be null!")
+ }
+
+ val driverClassName = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_DRIVER)
+ if (strObjIsBlank(driverClassName)) {
+ throw JDBCParamsIllegalException("The data source jdbc driverClassName cannot be null!")
+ }
+
+ val jdbcUrl = createJdbcUrl(dbType, dbConnParams)
+ info(s"The url parsed from the data source connection information is $jdbcUrl")
+ dsConnInfo.put(JDBCEngineConnConstant.JDBC_URL, jdbcUrl)
+ dsConnInfo.put(JDBCEngineConnConstant.JDBC_DRIVER, driverClassName.toString)
+ appendJdbcAuthType(dbConnParams, dsConnInfo)
+ }
+
+ def createJdbcUrl(dbType: String, dbConnParams: util.Map[String, Object]): String = {
+ val host = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_HOST)
+ if (strObjIsBlank(host)) {
+ throw JDBCParamsIllegalException("The data source jdbc connection host cannot be null!")
+ }
+ val port = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_PORT)
+ if (strObjIsBlank(port)) {
+ throw JDBCParamsIllegalException("The data source jdbc connection port cannot be null!")
+ }
+ var jdbcUrl = s"jdbc:$dbType://$host:$port"
+ val dbName = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_DB_NAME)
+ if (strObjIsNotBlank(dbName)) {
+ jdbcUrl = s"$jdbcUrl/$dbName"
+ }
+
+ val params = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_PARAMS)
+ val paramsMap = if (strObjIsNotBlank(params)) convertJsonStrToMap(params.toString) else new util.HashMap[String, Object]()
+
+ if (!paramsMap.isEmpty) {
+ val headConf = paramsMap.head
+ jdbcUrl = s"$jdbcUrl?${headConf._1}=${headConf._2}"
+ paramsMap.remove(headConf._1)
+ }
+
+ if (!paramsMap.isEmpty) {
+ val paramsJoin = for ((k, v) <- paramsMap) yield s"$k=${v.toString}".toList.mkString("&")
+ jdbcUrl = s"$jdbcUrl&$paramsJoin"
+ }
+
+ jdbcUrl
+ }
+
+ def appendJdbcAuthType(dbConnParams: util.Map[String, Object], dsConnInfo: util.HashMap[String, String]): util.HashMap[String, String] = {
+ val username = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_USERNAME)
+ val password = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_PASSWORD)
+ val enableKerberos = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_ENABLE_KERBEROS)
+ val kerberosPrincipal = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_KERBEROS_PRINCIPAL)
+ val kerberosKeytab = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_KERBEROS_KEYTAB)
+ var authType: JdbcAuthType = JdbcAuthType.SIMPLE
+ if (strObjIsNotBlank(username) && strObjIsNotBlank(password)) {
+ authType = JdbcAuthType.USERNAME
+ } else {
+ if (strObjIsNotBlank(enableKerberos) && enableKerberos.toString.toBoolean) {
+ authType = JdbcAuthType.KERBEROS
+ if (strObjIsBlank(kerberosPrincipal)) {
+ throw JDBCParamsIllegalException("In the jdbc authentication mode of kerberos, the kerberos principal cannot be empty!")
+ }
+ if (strObjIsBlank(kerberosKeytab)) {
+ throw JDBCParamsIllegalException("In the jdbc authentication mode of kerberos, the kerberos keytab cannot be empty!")
+ }
+ } else {
+ authType = JdbcAuthType.SIMPLE
+ }
+ }
+
+ authType match {
+ case JdbcAuthType.SIMPLE =>
+ info("jdbc simple auth type.")
+ case JdbcAuthType.USERNAME =>
+ dsConnInfo.put(JDBCEngineConnConstant.JDBC_USERNAME, username.toString)
+ dsConnInfo.put(JDBCEngineConnConstant.JDBC_PASSWORD, password.toString)
+ case JdbcAuthType.KERBEROS =>
+ dsConnInfo.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL, kerberosPrincipal.toString)
+ dsConnInfo.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION, kerberosKeytab.toString)
+ val enableKerberosProxyUser = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_ENABLE_KERBEROS_PROXY_USER)
+ if (strObjIsNotBlank(enableKerberosProxyUser)) {
+ dsConnInfo.put(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_PROXY_ENABLE, enableKerberosProxyUser.toString)
+ }
+ val kerberosProxyUserProperty = dbConnParams.get(JDBCEngineConnConstant.DS_JDBC_KERBEROS_PROXY_USER_PROPERTY)
+ if (strObjIsNotBlank(kerberosProxyUserProperty)) {
+ dsConnInfo.put(JDBCEngineConnConstant.JDBC_PROXY_USER_PROPERTY, kerberosProxyUserProperty.toString)
+ }
+ case _ => throw JDBCParamsIllegalException(s"Unsupported authentication type ${authType.getAuthType}")
+ }
+ dsConnInfo.put(JDBCEngineConnConstant.JDBC_AUTH_TYPE, authType.getAuthType)
+ dsConnInfo
+ }
+
+ private def convertJsonStrToMap(jsonStr: String): util.Map[String, Object] = {
+ JsonUtils.jackson.readValue(jsonStr, classOf[util.Map[String, Object]])
+ }
+
+ private def strObjIsNotBlank(str: Object): Boolean = {
+ str != null && StringUtils.isNotBlank(str.toString)
+ }
+
+ private def strObjIsBlank(str: Object): Boolean = {
+ ! strObjIsNotBlank(str)
+ }
+}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala
new file mode 100644
index 000000000..a425ea867
--- /dev/null
+++ b/linkis-engineconn-plugins/engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.jdbc.executer
+
+import org.apache.linkis.datasourcemanager.common.domain.{DataSource, DataSourceType}
+import org.apache.linkis.manager.engineplugin.jdbc.JdbcAuthType
+
+import java.util
+import org.junit.jupiter.api.{BeforeEach, DisplayName, Test}
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.apache.linkis.manager.engineplugin.jdbc.constant.JDBCEngineConnConstant
+
+
+class JDBCMultiDatasourceParserTest {
+
+ val dbType = "mysql"
+ val dbConnParams: util.Map[String, Object] = new util.HashMap[String, Object]()
+ val datasource: DataSource = new DataSource()
+
+ @BeforeEach
+ def initDatasourceConnParams(): Unit = {
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_HOST, "localhost")
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_PORT, "3306")
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_USERNAME, "username")
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_PASSWORD, "password")
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_DB_NAME, "dbName")
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_DRIVER, "com.mysql.jdbc.Driver")
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_PARAMS, "{\"useSSL\": \"false\"}")
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_ENABLE_KERBEROS, "false")
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_KERBEROS_PRINCIPAL, "hadoop@com")
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_KERBEROS_KEYTAB, "/data/linkis/keytab/hadoop.keytab")
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_ENABLE_KERBEROS_PROXY_USER, "true")
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_KERBEROS_PROXY_USER_PROPERTY, "hive.server2.proxy.user")
+
+ val dataSourceType = new DataSourceType()
+ dataSourceType.setName(dbType)
+ datasource.setDataSourceType(dataSourceType)
+ datasource.setConnectParams(dbConnParams)
+ datasource.setPublishedVersionId(1L)
+ datasource.setExpire(false)
+ }
+
+ @Test
+ @DisplayName("testCreateJdbcUrl")
+ def testCreateJdbcUrl(): Unit = {
+ val url1 = JDBCMultiDatasourceParser.createJdbcUrl(dbType, dbConnParams)
+ assertTrue(url1 != null && "jdbc:mysql://localhost:3306/dbName?useSSL=false".equals(url1))
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_DB_NAME, "")
+ val url2 = JDBCMultiDatasourceParser.createJdbcUrl(dbType, dbConnParams)
+ assertTrue(url2 != null && "jdbc:mysql://localhost:3306?useSSL=false".equals(url2))
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_HOST, "")
+ try {
+ JDBCMultiDatasourceParser.createJdbcUrl(dbType, dbConnParams)
+ } catch {
+ case e: Throwable => assertTrue(true)
+ }
+ }
+
+ @Test
+ @DisplayName("testAppendJdbcAuthType")
+ def testAppendJdbcAuthType(): Unit = {
+ var dsConnInfo = new util.HashMap[String, String]()
+ dsConnInfo = JDBCMultiDatasourceParser.appendJdbcAuthType(dbConnParams, dsConnInfo)
+ assertTrue(dsConnInfo.get(JDBCEngineConnConstant.JDBC_AUTH_TYPE).equals(JdbcAuthType.USERNAME.getAuthType))
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_USERNAME, "")
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_PASSWORD, "")
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_ENABLE_KERBEROS, "true")
+ dsConnInfo = new util.HashMap[String, String]()
+ dsConnInfo = JDBCMultiDatasourceParser.appendJdbcAuthType(dbConnParams, dsConnInfo)
+ assertTrue(dsConnInfo.get(JDBCEngineConnConstant.JDBC_AUTH_TYPE).equals(JdbcAuthType.KERBEROS.getAuthType))
+ dbConnParams.put(JDBCEngineConnConstant.DS_JDBC_ENABLE_KERBEROS_PROXY_USER, "false")
+ dsConnInfo = new util.HashMap[String, String]()
+ dsConnInfo = JDBCMultiDatasourceParser.appendJdbcAuthType(dbConnParams, dsConnInfo)
+ assertTrue(dsConnInfo.get(JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_PROXY_ENABLE).equals("false"))
+ }
+
+ @Test
+ @DisplayName("testQueryDatasourceInfo")
+ def testQueryDatasourceInfo(): Unit = {
+ val dataSourceInfo = JDBCMultiDatasourceParser.queryDatasourceInfo("test_mysql", datasource)
+ val jdbcUrl = dataSourceInfo.get(JDBCEngineConnConstant.JDBC_URL)
+ assertTrue(jdbcUrl != null && "jdbc:mysql://localhost:3306/dbName?useSSL=false".equals(jdbcUrl))
+ assertTrue(dataSourceInfo.get(JDBCEngineConnConstant.JDBC_AUTH_TYPE).equals(JdbcAuthType.USERNAME.getAuthType))
+ }
+
+ @Test
+ @DisplayName("testMapPutAll")
+ def testMapPutAll(): Unit = {
+ val globalConfig: util.Map[String, String] = new util.HashMap[String, String]()
+ globalConfig.put(JDBCEngineConnConstant.JDBC_URL, "jdbc:mysql://localhost:3306/dbName?useSSL=false")
+ globalConfig.put(JDBCEngineConnConstant.JDBC_AUTH_TYPE, JdbcAuthType.SIMPLE.getAuthType)
+ val dataSourceInfo: util.Map[String, String] = new util.HashMap[String, String]()
+ dataSourceInfo.put(JDBCEngineConnConstant.JDBC_URL, "jdbc:mysql://127.0.0.1:3306/dbName?useSSL=false")
+ dataSourceInfo.put(JDBCEngineConnConstant.JDBC_AUTH_TYPE, JdbcAuthType.USERNAME.getAuthType)
+ dataSourceInfo.put(JDBCEngineConnConstant.JDBC_USERNAME, "user")
+ dataSourceInfo.put(JDBCEngineConnConstant.JDBC_PASSWORD, "password")
+ globalConfig.putAll(dataSourceInfo)
+ assertTrue(globalConfig.size() == 4)
+ assertTrue(globalConfig.get(JDBCEngineConnConstant.JDBC_AUTH_TYPE).equals(JdbcAuthType.USERNAME.getAuthType))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org