You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/23 12:43:47 UTC
[dolphinscheduler] branch dev updated: [Fix-10918] Close datasource when expire from guava cache (#11120)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 71cf7e1baa [Fix-10918] Close datasource when expire from guava cache (#11120)
71cf7e1baa is described below
commit 71cf7e1baa7b813ac381c37c0345e837d2c07557
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Jul 23 20:43:37 2022 +0800
[Fix-10918] Close datasource when expire from guava cache (#11120)
* Close datasource when expire from guava cache
* Remove duplicate datasource in HiveDataSourceClient
---
.../api/client/CommonDataSourceClient.java | 11 +++++-----
.../api/plugin/DataSourceClientProvider.java | 25 ++++++++++++++--------
.../datasource/hive/HiveDataSourceClient.java | 25 +++++++++++-----------
.../spi/datasource/DataSourceClient.java | 3 ++-
4 files changed, 37 insertions(+), 27 deletions(-)
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
index 37af397758..50de82f93d 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
@@ -27,13 +27,12 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
-import javax.sql.DataSource;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import com.google.common.base.Stopwatch;
+import com.zaxxer.hikari.HikariDataSource;
public class CommonDataSourceClient implements DataSourceClient {
@@ -43,7 +42,7 @@ public class CommonDataSourceClient implements DataSourceClient {
public static final String COMMON_VALIDATION_QUERY = "select 1";
protected final BaseConnectionParam baseConnectionParam;
- protected DataSource dataSource;
+ protected HikariDataSource dataSource;
protected JdbcTemplate jdbcTemplate;
public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
@@ -113,8 +112,10 @@ public class CommonDataSourceClient implements DataSourceClient {
@Override
public void close() {
- logger.info("do close dataSource.");
- this.dataSource = null;
+ logger.info("do close dataSource {}.", baseConnectionParam.getDatabase());
+ try (HikariDataSource closedDatasource = dataSource) {
+ // only close the resource
+ }
this.jdbcTemplate = null;
}
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
index 6c2f8b1ac6..a849fa6b48 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
@@ -27,23 +25,32 @@ import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
public class DataSourceClientProvider {
private static final Logger logger = LoggerFactory.getLogger(DataSourceClientProvider.class);
- private static long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24);
+ private static final long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24);
private static final Cache<String, DataSourceClient> uniqueId2dataSourceClientCache = CacheBuilder.newBuilder()
- .expireAfterWrite(duration, TimeUnit.HOURS)
- .maximumSize(100)
- .build();
+ .expireAfterWrite(duration, TimeUnit.HOURS)
+ .removalListener((RemovalListener<String, DataSourceClient>) notification -> {
+ try (DataSourceClient closedClient = notification.getValue()) {
+ logger.info("Datasource: {} is removed from cache due to expire", notification.getKey());
+ }
+ })
+ .maximumSize(100)
+ .build();
private DataSourcePluginManager dataSourcePluginManager;
private DataSourceClientProvider() {
@@ -61,7 +68,7 @@ public class DataSourceClientProvider {
public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {
BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
- logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId);
+ logger.info("Get connection from datasource {}", datasourceUniqueId);
DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {
Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
index dd4f7e89ca..13c1263fa2 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
@@ -17,8 +17,10 @@
package org.apache.dolphinscheduler.plugin.datasource.hive;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.zaxxer.hikari.HikariDataSource;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF_PATH;
+
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
import org.apache.dolphinscheduler.plugin.datasource.hive.utils.CommonUtil;
@@ -27,11 +29,9 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.security.krb5.Config;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -41,7 +41,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import sun.security.krb5.Config;
public class HiveDataSourceClient extends CommonDataSourceClient {
@@ -50,7 +55,6 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
private ScheduledExecutorService kerberosRenewalService;
private Configuration hadoopConf;
- protected HikariDataSource oneSessionDataSource;
private UserGroupInformation ugi;
private boolean retryGetConnection = true;
@@ -76,7 +80,7 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
logger.info("Create ugi success.");
super.initClient(baseConnectionParam, dbType);
- this.oneSessionDataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
+ this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
logger.info("Init {} success.", getClass().getName());
}
@@ -144,7 +148,7 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
@Override
public Connection getConnection() {
try {
- return oneSessionDataSource.getConnection();
+ return dataSource.getConnection();
} catch (SQLException e) {
boolean kerberosStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
if (retryGetConnection && kerberosStartupState) {
@@ -166,8 +170,5 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
logger.info("close {}.", this.getClass().getSimpleName());
kerberosRenewalService.shutdown();
this.ugi = null;
-
- this.oneSessionDataSource.close();
- this.oneSessionDataSource = null;
}
}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java
index 879d198284..82eb1f02cb 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java
@@ -19,10 +19,11 @@ package org.apache.dolphinscheduler.spi.datasource;
import java.sql.Connection;
-public interface DataSourceClient {
+public interface DataSourceClient extends AutoCloseable {
void checkClient();
+ @Override
void close();
Connection getConnection();