You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/08/13 06:12:20 UTC

[incubator-linkis] branch dev-1.2.0 updated: [Feature] The connection cache pool key value in the ConnectionManager is adjusted to the data source name + version number. (#2742)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
     new 11914fb94 [Feature] The connection cache pool key value in the ConnectionManager is adjusted to the data source name + version number. (#2742)
11914fb94 is described below

commit 11914fb94290ba77d14210a88e044ed495f7ba08
Author: weixiao <le...@gmail.com>
AuthorDate: Sat Aug 13 14:12:14 2022 +0800

    [Feature] The connection cache pool key value in the ConnectionManager is adjusted to the data source name + version number. (#2742)
---
 .../engineplugin/jdbc/ConnectionManager.java       | 32 +++++++++++-----------
 .../jdbc/constant/JDBCEngineConnConstant.java      |  2 ++
 .../jdbc/executer/JDBCEngineConnExecutor.scala     |  5 +++-
 .../jdbc/executer/JDBCMultiDatasourceParser.scala  |  3 ++
 .../engineplugin/jdbc/ConnectionManagerTest.java   |  2 +-
 .../executer/JDBCMultiDatasourceParserTest.scala   |  1 +
 6 files changed, 27 insertions(+), 18 deletions(-)

diff --git a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
index b904d74fc..6bfd5dad2 100644
--- a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
+++ b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
@@ -217,68 +217,68 @@ public class ConnectionManager {
     }
 
     private Connection getConnectionFromDataSource(
-            String dataSourceName, String url, Map<String, String> prop)
+            String dataSourceIdentifier, String url, Map<String, String> prop)
             throws SQLException, JDBCParamsIllegalException {
-        DataSource dataSource = dataSourceFactories.get(dataSourceName);
+        DataSource dataSource = dataSourceFactories.get(dataSourceIdentifier);
         if (dataSource == null) {
             synchronized (dataSourceFactories) {
                 if (dataSource == null) {
                     dataSource = buildDataSource(url, prop);
-                    dataSourceFactories.put(dataSourceName, dataSource);
+                    dataSourceFactories.put(dataSourceIdentifier, dataSource);
                 }
             }
         }
         return dataSource.getConnection();
     }
 
-    public Connection getConnection(String dataSourceName, Map<String, String> propperties)
+    public Connection getConnection(String dataSourceIdentifier, Map<String, String> properties)
             throws SQLException, JDBCParamsIllegalException {
         String execUser =
                 JDBCPropertiesParser.getString(
-                        propperties, JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER, "");
+                        properties, JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER, "");
         if (StringUtils.isBlank(execUser)) {
             LOG.warn("No such execUser: {}", execUser);
             throw new JDBCParamsIllegalException("No execUser");
         }
         Connection connection = null;
-        final String jdbcUrl = getJdbcUrl(propperties);
-        JdbcAuthType jdbcAuthType = getJdbcAuthType(propperties);
+        final String jdbcUrl = getJdbcUrl(properties);
+        JdbcAuthType jdbcAuthType = getJdbcAuthType(properties);
         switch (jdbcAuthType) {
             case SIMPLE:
             case USERNAME:
-                connection = getConnectionFromDataSource(dataSourceName, jdbcUrl, propperties);
+                connection = getConnectionFromDataSource(dataSourceIdentifier, jdbcUrl, properties);
                 break;
             case KERBEROS:
                 LOG.debug(
                         "Calling createKerberosSecureConfiguration(); this will do loginUserFromKeytab() if required");
                 final String keytab =
                         JDBCPropertiesParser.getString(
-                                propperties,
+                                properties,
                                 JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION,
                                 "");
                 final String principal =
                         JDBCPropertiesParser.getString(
-                                propperties,
+                                properties,
                                 JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL,
                                 "");
                 KerberosUtils.createKerberosSecureConfiguration(keytab, principal);
                 LOG.debug("createKerberosSecureConfiguration() returned");
                 boolean isProxyEnabled =
                         JDBCPropertiesParser.getBool(
-                                propperties,
+                                properties,
                                 JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_PROXY_ENABLE,
                                 true);
 
                 if (isProxyEnabled) {
                     final String jdbcUrlWithProxyUser =
-                            appendProxyUserToJDBCUrl(jdbcUrl, execUser, propperties);
+                            appendProxyUserToJDBCUrl(jdbcUrl, execUser, properties);
                     LOG.info(
                             String.format(
                                     "Try to Create a new %s JDBC with url(%s), kerberos, proxyUser(%s).",
-                                    dataSourceName, jdbcUrlWithProxyUser, execUser));
+                                    dataSourceIdentifier, jdbcUrlWithProxyUser, execUser));
                     connection =
                             getConnectionFromDataSource(
-                                    dataSourceName, jdbcUrlWithProxyUser, propperties);
+                                    dataSourceIdentifier, jdbcUrlWithProxyUser, properties);
                 } else {
                     UserGroupInformation ugi;
                     try {
@@ -296,9 +296,9 @@ public class ConnectionManager {
                                         (PrivilegedExceptionAction<Connection>)
                                                 () ->
                                                         getConnectionFromDataSource(
-                                                                dataSourceName,
+                                                                dataSourceIdentifier,
                                                                 jdbcUrl,
-                                                                propperties));
+                                                                properties));
                     } catch (Exception e) {
                         throw new JDBCParamsIllegalException(
                                 "Error in doAs to get one connection.");
diff --git a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
index 305643f67..2500c6e4a 100644
--- a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
+++ b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
@@ -36,6 +36,8 @@ public class JDBCEngineConnConstant {
     public static final String JDBC_PROXY_USER = "wds.linkis.jdbc.proxy.user";
     public static final String JDBC_SCRIPTS_EXEC_USER = "execUser";
     public static final String JDBC_ENGINE_RUN_TIME_DS = "wds.linkis.engine.runtime.datasource";
+    public static final String JDBC_ENGINE_RUN_TIME_DS_MAX_VERSION_ID =
+            "wds.linkis.engine.runtime.datasource.maxVersionId";
     public static final String JDBC_ENGINE_RUN_TIME_DS_SYSTEM_QUERY_PARAM =
             "wds.linkis.engine.runtime.datasource.systemQueryParam";
 
diff --git a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
index 79bf23af4..89b9f56ee 100644
--- a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
@@ -69,6 +69,7 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
     val taskId = engineExecutorContext.getJobId.get
     val properties = engineExecutorContext.getProperties.asInstanceOf[util.Map[String, String]]
     var dataSourceName = properties.getOrDefault(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS, "")
+    var dataSourceMaxVersionId: Int = 0
     val dataSourceQuerySystemParam = properties.getOrDefault(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS_SYSTEM_QUERY_PARAM, "")
 
     if (properties.get(JDBCEngineConnConstant.JDBC_URL) == null) {
@@ -79,6 +80,7 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
         Utils.tryCatch {
           val dataSourceInfo = JDBCMultiDatasourceParser.queryDatasourceInfoByName(dataSourceName, execSqlUser, dataSourceQuerySystemParam)
           if (dataSourceInfo != null && !dataSourceInfo.isEmpty) {
+            dataSourceMaxVersionId = dataSourceInfo.get(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS_MAX_VERSION_ID).toInt
             globalConfig.putAll(dataSourceInfo)
           }
         } {
@@ -106,7 +108,8 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
     var resultSet: ResultSet = null
     logger.info(s"The data source properties is $properties")
     Utils.tryCatch({
-      connection = connectionManager.getConnection(dataSourceName, properties)
+      val dataSourceIdentifier = s"$dataSourceName-$dataSourceMaxVersionId"
+      connection = connectionManager.getConnection(dataSourceIdentifier, properties)
       logger.info("The jdbc connection has created successfully!")
     }) {
       e: Throwable =>
diff --git a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala
index 13cfd842e..012851141 100644
--- a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala
+++ b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala
@@ -57,6 +57,9 @@ object JDBCMultiDatasourceParser extends Logging {
       throw JDBCParamsIllegalException(s"Data source [$datasourceName] not yet published!")
     }
 
+    val maxVersionId = dataSource.getVersionId.toString
+    dsConnInfo.put(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS_MAX_VERSION_ID, maxVersionId)
+
     if (dataSource.isExpire) {
       throw JDBCParamsIllegalException(s"Data source [$datasourceName] is expired!")
     }
diff --git a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
index bb448d853..5dc1a5712 100644
--- a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
+++ b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
@@ -51,7 +51,7 @@ public class ConnectionManagerTest {
         properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER, "");
         properties.put(JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER, "leo_jie");
         ConnectionManager connectionManager = ConnectionManager.getInstance();
-        Connection conn = connectionManager.getConnection("jdbc", properties);
+        Connection conn = connectionManager.getConnection("jdbc-1", properties);
         Statement statement = conn.createStatement();
         ResultSet rs = statement.executeQuery("show databases;");
         while (rs.next()) {
diff --git a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala
index a425ea867..dabf81ef7 100644
--- a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala
+++ b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala
@@ -52,6 +52,7 @@ class JDBCMultiDatasourceParserTest {
     datasource.setDataSourceType(dataSourceType)
     datasource.setConnectParams(dbConnParams)
     datasource.setPublishedVersionId(1L)
+    datasource.setVersionId(1L)
     datasource.setExpire(false)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org