You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/23 12:43:47 UTC

[dolphinscheduler] branch dev updated: [Fix-10918] Close datasource when expire from guava cache (#11120)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 71cf7e1baa [Fix-10918] Close datasource when expire from guava cache (#11120)
71cf7e1baa is described below

commit 71cf7e1baa7b813ac381c37c0345e837d2c07557
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat Jul 23 20:43:37 2022 +0800

    [Fix-10918] Close datasource when expire from guava cache (#11120)
    
    * Close datasource when expire from guava cache
    * Remove duplicate datasource in HiveDataSourceClient
---
 .../api/client/CommonDataSourceClient.java         | 11 +++++-----
 .../api/plugin/DataSourceClientProvider.java       | 25 ++++++++++++++--------
 .../datasource/hive/HiveDataSourceClient.java      | 25 +++++++++++-----------
 .../spi/datasource/DataSourceClient.java           |  3 ++-
 4 files changed, 37 insertions(+), 27 deletions(-)

diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
index 37af397758..50de82f93d 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
@@ -27,13 +27,12 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.concurrent.TimeUnit;
 
-import javax.sql.DataSource;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.jdbc.core.JdbcTemplate;
 
 import com.google.common.base.Stopwatch;
+import com.zaxxer.hikari.HikariDataSource;
 
 public class CommonDataSourceClient implements DataSourceClient {
 
@@ -43,7 +42,7 @@ public class CommonDataSourceClient implements DataSourceClient {
     public static final String COMMON_VALIDATION_QUERY = "select 1";
 
     protected final BaseConnectionParam baseConnectionParam;
-    protected DataSource dataSource;
+    protected HikariDataSource dataSource;
     protected JdbcTemplate jdbcTemplate;
 
     public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
@@ -113,8 +112,10 @@ public class CommonDataSourceClient implements DataSourceClient {
 
     @Override
     public void close() {
-        logger.info("do close dataSource.");
-        this.dataSource = null;
+        logger.info("do close dataSource {}.", baseConnectionParam.getDatabase());
+        try (HikariDataSource closedDatasource = dataSource) {
+            // only close the resource
+        }
         this.jdbcTemplate = null;
     }
 
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
index 6c2f8b1ac6..a849fa6b48 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
@@ -17,8 +17,6 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
@@ -27,23 +25,32 @@ import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
 import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
 
 public class DataSourceClientProvider {
     private static final Logger logger = LoggerFactory.getLogger(DataSourceClientProvider.class);
 
-    private static long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24);
+    private static final long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24);
     private static final Cache<String, DataSourceClient> uniqueId2dataSourceClientCache = CacheBuilder.newBuilder()
-            .expireAfterWrite(duration, TimeUnit.HOURS)
-            .maximumSize(100)
-            .build();
+        .expireAfterWrite(duration, TimeUnit.HOURS)
+        .removalListener((RemovalListener<String, DataSourceClient>) notification -> {
+            try (DataSourceClient closedClient = notification.getValue()) {
+                logger.info("Datasource: {} is removed from cache due to expire", notification.getKey());
+            }
+        })
+        .maximumSize(100)
+        .build();
     private DataSourcePluginManager dataSourcePluginManager;
 
     private DataSourceClientProvider() {
@@ -61,7 +68,7 @@ public class DataSourceClientProvider {
     public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {
         BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
         String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
-        logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId);
+        logger.info("Get connection from datasource {}", datasourceUniqueId);
 
         DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {
             Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
index dd4f7e89ca..13c1263fa2 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
@@ -17,8 +17,10 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.hive;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.zaxxer.hikari.HikariDataSource;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF_PATH;
+
 import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
 import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
 import org.apache.dolphinscheduler.plugin.datasource.hive.utils.CommonUtil;
@@ -27,11 +29,9 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.dolphinscheduler.spi.utils.Constants;
 import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.security.krb5.Config;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -41,7 +41,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import sun.security.krb5.Config;
 
 public class HiveDataSourceClient extends CommonDataSourceClient {
 
@@ -50,7 +55,6 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
     private ScheduledExecutorService kerberosRenewalService;
 
     private Configuration hadoopConf;
-    protected HikariDataSource oneSessionDataSource;
     private UserGroupInformation ugi;
     private boolean retryGetConnection = true;
 
@@ -76,7 +80,7 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
         logger.info("Create ugi success.");
 
         super.initClient(baseConnectionParam, dbType);
-        this.oneSessionDataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
+        this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
         logger.info("Init {} success.", getClass().getName());
     }
 
@@ -144,7 +148,7 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
     @Override
     public Connection getConnection() {
         try {
-            return oneSessionDataSource.getConnection();
+            return dataSource.getConnection();
         } catch (SQLException e) {
             boolean kerberosStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
             if (retryGetConnection && kerberosStartupState) {
@@ -166,8 +170,5 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
         logger.info("close {}.", this.getClass().getSimpleName());
         kerberosRenewalService.shutdown();
         this.ugi = null;
-
-        this.oneSessionDataSource.close();
-        this.oneSessionDataSource = null;
     }
 }
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java
index 879d198284..82eb1f02cb 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java
@@ -19,10 +19,11 @@ package org.apache.dolphinscheduler.spi.datasource;
 
 import java.sql.Connection;
 
-public interface DataSourceClient {
+public interface DataSourceClient extends AutoCloseable {
 
     void checkClient();
 
+    @Override
     void close();
 
     Connection getConnection();