You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/08/13 06:12:20 UTC
[incubator-linkis] branch dev-1.2.0 updated: [Feature] The connection cache pool key value in the ConnectionManager is adjusted to the data source name + version number. (#2742)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
new 11914fb94 [Feature] The connection cache pool key value in the ConnectionManager is adjusted to the data source name + version number. (#2742)
11914fb94 is described below
commit 11914fb94290ba77d14210a88e044ed495f7ba08
Author: weixiao <le...@gmail.com>
AuthorDate: Sat Aug 13 14:12:14 2022 +0800
[Feature] The connection cache pool key value in the ConnectionManager is adjusted to the data source name + version number. (#2742)
---
.../engineplugin/jdbc/ConnectionManager.java | 32 +++++++++++-----------
.../jdbc/constant/JDBCEngineConnConstant.java | 2 ++
.../jdbc/executer/JDBCEngineConnExecutor.scala | 5 +++-
.../jdbc/executer/JDBCMultiDatasourceParser.scala | 3 ++
.../engineplugin/jdbc/ConnectionManagerTest.java | 2 +-
.../executer/JDBCMultiDatasourceParserTest.scala | 1 +
6 files changed, 27 insertions(+), 18 deletions(-)
diff --git a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
index b904d74fc..6bfd5dad2 100644
--- a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
+++ b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManager.java
@@ -217,68 +217,68 @@ public class ConnectionManager {
}
private Connection getConnectionFromDataSource(
- String dataSourceName, String url, Map<String, String> prop)
+ String dataSourceIdentifier, String url, Map<String, String> prop)
throws SQLException, JDBCParamsIllegalException {
- DataSource dataSource = dataSourceFactories.get(dataSourceName);
+ DataSource dataSource = dataSourceFactories.get(dataSourceIdentifier);
if (dataSource == null) {
synchronized (dataSourceFactories) {
if (dataSource == null) {
dataSource = buildDataSource(url, prop);
- dataSourceFactories.put(dataSourceName, dataSource);
+ dataSourceFactories.put(dataSourceIdentifier, dataSource);
}
}
}
return dataSource.getConnection();
}
- public Connection getConnection(String dataSourceName, Map<String, String> propperties)
+ public Connection getConnection(String dataSourceIdentifier, Map<String, String> properties)
throws SQLException, JDBCParamsIllegalException {
String execUser =
JDBCPropertiesParser.getString(
- propperties, JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER, "");
+ properties, JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER, "");
if (StringUtils.isBlank(execUser)) {
LOG.warn("No such execUser: {}", execUser);
throw new JDBCParamsIllegalException("No execUser");
}
Connection connection = null;
- final String jdbcUrl = getJdbcUrl(propperties);
- JdbcAuthType jdbcAuthType = getJdbcAuthType(propperties);
+ final String jdbcUrl = getJdbcUrl(properties);
+ JdbcAuthType jdbcAuthType = getJdbcAuthType(properties);
switch (jdbcAuthType) {
case SIMPLE:
case USERNAME:
- connection = getConnectionFromDataSource(dataSourceName, jdbcUrl, propperties);
+ connection = getConnectionFromDataSource(dataSourceIdentifier, jdbcUrl, properties);
break;
case KERBEROS:
LOG.debug(
"Calling createKerberosSecureConfiguration(); this will do loginUserFromKeytab() if required");
final String keytab =
JDBCPropertiesParser.getString(
- propperties,
+ properties,
JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_KEYTAB_LOCATION,
"");
final String principal =
JDBCPropertiesParser.getString(
- propperties,
+ properties,
JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_TYPE_PRINCIPAL,
"");
KerberosUtils.createKerberosSecureConfiguration(keytab, principal);
LOG.debug("createKerberosSecureConfiguration() returned");
boolean isProxyEnabled =
JDBCPropertiesParser.getBool(
- propperties,
+ properties,
JDBCEngineConnConstant.JDBC_KERBEROS_AUTH_PROXY_ENABLE,
true);
if (isProxyEnabled) {
final String jdbcUrlWithProxyUser =
- appendProxyUserToJDBCUrl(jdbcUrl, execUser, propperties);
+ appendProxyUserToJDBCUrl(jdbcUrl, execUser, properties);
LOG.info(
String.format(
"Try to Create a new %s JDBC with url(%s), kerberos, proxyUser(%s).",
- dataSourceName, jdbcUrlWithProxyUser, execUser));
+ dataSourceIdentifier, jdbcUrlWithProxyUser, execUser));
connection =
getConnectionFromDataSource(
- dataSourceName, jdbcUrlWithProxyUser, propperties);
+ dataSourceIdentifier, jdbcUrlWithProxyUser, properties);
} else {
UserGroupInformation ugi;
try {
@@ -296,9 +296,9 @@ public class ConnectionManager {
(PrivilegedExceptionAction<Connection>)
() ->
getConnectionFromDataSource(
- dataSourceName,
+ dataSourceIdentifier,
jdbcUrl,
- propperties));
+ properties));
} catch (Exception e) {
throw new JDBCParamsIllegalException(
"Error in doAs to get one connection.");
diff --git a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
index 305643f67..2500c6e4a 100644
--- a/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
+++ b/linkis-engineconn-plugins/jdbc/src/main/java/org/apache/linkis/manager/engineplugin/jdbc/constant/JDBCEngineConnConstant.java
@@ -36,6 +36,8 @@ public class JDBCEngineConnConstant {
public static final String JDBC_PROXY_USER = "wds.linkis.jdbc.proxy.user";
public static final String JDBC_SCRIPTS_EXEC_USER = "execUser";
public static final String JDBC_ENGINE_RUN_TIME_DS = "wds.linkis.engine.runtime.datasource";
+ public static final String JDBC_ENGINE_RUN_TIME_DS_MAX_VERSION_ID =
+ "wds.linkis.engine.runtime.datasource.maxVersionId";
public static final String JDBC_ENGINE_RUN_TIME_DS_SYSTEM_QUERY_PARAM =
"wds.linkis.engine.runtime.datasource.systemQueryParam";
diff --git a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
index 79bf23af4..89b9f56ee 100644
--- a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCEngineConnExecutor.scala
@@ -69,6 +69,7 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
val taskId = engineExecutorContext.getJobId.get
val properties = engineExecutorContext.getProperties.asInstanceOf[util.Map[String, String]]
var dataSourceName = properties.getOrDefault(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS, "")
+ var dataSourceMaxVersionId: Int = 0
val dataSourceQuerySystemParam = properties.getOrDefault(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS_SYSTEM_QUERY_PARAM, "")
if (properties.get(JDBCEngineConnConstant.JDBC_URL) == null) {
@@ -79,6 +80,7 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
Utils.tryCatch {
val dataSourceInfo = JDBCMultiDatasourceParser.queryDatasourceInfoByName(dataSourceName, execSqlUser, dataSourceQuerySystemParam)
if (dataSourceInfo != null && !dataSourceInfo.isEmpty) {
+ dataSourceMaxVersionId = dataSourceInfo.get(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS_MAX_VERSION_ID).toInt
globalConfig.putAll(dataSourceInfo)
}
} {
@@ -106,7 +108,8 @@ class JDBCEngineConnExecutor(override val outputPrintLimit: Int, val id: Int) ex
var resultSet: ResultSet = null
logger.info(s"The data source properties is $properties")
Utils.tryCatch({
- connection = connectionManager.getConnection(dataSourceName, properties)
+ val dataSourceIdentifier = s"$dataSourceName-$dataSourceMaxVersionId"
+ connection = connectionManager.getConnection(dataSourceIdentifier, properties)
logger.info("The jdbc connection has created successfully!")
}) {
e: Throwable =>
diff --git a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala
index 13cfd842e..012851141 100644
--- a/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala
+++ b/linkis-engineconn-plugins/jdbc/src/main/scala/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParser.scala
@@ -57,6 +57,9 @@ object JDBCMultiDatasourceParser extends Logging {
throw JDBCParamsIllegalException(s"Data source [$datasourceName] not yet published!")
}
+ val maxVersionId = dataSource.getVersionId.toString
+ dsConnInfo.put(JDBCEngineConnConstant.JDBC_ENGINE_RUN_TIME_DS_MAX_VERSION_ID, maxVersionId)
+
if (dataSource.isExpire) {
throw JDBCParamsIllegalException(s"Data source [$datasourceName] is expired!")
}
diff --git a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
index bb448d853..5dc1a5712 100644
--- a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
+++ b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/ConnectionManagerTest.java
@@ -51,7 +51,7 @@ public class ConnectionManagerTest {
properties.put(JDBCEngineConnConstant.JDBC_PROXY_USER, "");
properties.put(JDBCEngineConnConstant.JDBC_SCRIPTS_EXEC_USER, "leo_jie");
ConnectionManager connectionManager = ConnectionManager.getInstance();
- Connection conn = connectionManager.getConnection("jdbc", properties);
+ Connection conn = connectionManager.getConnection("jdbc-1", properties);
Statement statement = conn.createStatement();
ResultSet rs = statement.executeQuery("show databases;");
while (rs.next()) {
diff --git a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala
index a425ea867..dabf81ef7 100644
--- a/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala
+++ b/linkis-engineconn-plugins/jdbc/src/test/java/org/apache/linkis/manager/engineplugin/jdbc/executer/JDBCMultiDatasourceParserTest.scala
@@ -52,6 +52,7 @@ class JDBCMultiDatasourceParserTest {
datasource.setDataSourceType(dataSourceType)
datasource.setConnectParams(dbConnParams)
datasource.setPublishedVersionId(1L)
+ datasource.setVersionId(1L)
datasource.setExpire(false)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org