You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2023/07/24 02:18:26 UTC

[dolphinscheduler] branch revert-14193-dev created (now 3e9165d5d7)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a change to branch revert-14193-dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


      at 3e9165d5d7 Revert "datasource test and sql task Remove connection pool issues is #14179 (#14193)"

This branch includes the following new commits:

     new 3e9165d5d7 Revert "datasource test and sql task Remove connection pool issues is #14179 (#14193)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[dolphinscheduler] 01/01: Revert "datasource test and sql task Remove connection pool issues is #14179 (#14193)"

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch revert-14193-dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 3e9165d5d76c570fba5b9907ba7761c23ef0a9fa
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Jul 24 10:18:21 2023 +0800

    Revert "datasource test and sql task Remove connection pool issues is #14179 (#14193)"
    
    This reverts commit e4fb5b30a45b5e2380841e8b37dff2f0408fc5a7.
---
 .../api/client/CommonDataSourceClient.java         | 42 ++++++++-----------
 .../azuresql/AzureSQLDataSourceClient.java         | 34 ++++++++++++++++
 .../datasource/hive/HiveDataSourceClient.java      | 30 ++++++++++++++
 .../datasource/kyuubi/KyuubiDataSourceClient.java  | 27 +++++++++++++
 .../kyuubi/KyuubiDataSourceClientTest.java         |  8 ++++
 .../redshift/RedshiftDataSourceClient.java         | 47 ++++++++++++++++++++++
 6 files changed, 162 insertions(+), 26 deletions(-)

diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
index 37d783b640..c87b3453a1 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.plugin.datasource.api.client;
 
+import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
 import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -24,13 +25,15 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
 import org.apache.commons.lang3.StringUtils;
 
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.springframework.jdbc.core.JdbcTemplate;
+
 import com.google.common.base.Stopwatch;
+import com.zaxxer.hikari.HikariDataSource;
 
 @Slf4j
 public class CommonDataSourceClient implements DataSourceClient {
@@ -39,7 +42,8 @@ public class CommonDataSourceClient implements DataSourceClient {
     public static final String COMMON_VALIDATION_QUERY = "select 1";
 
     protected final BaseConnectionParam baseConnectionParam;
-    protected Connection connection;
+    protected HikariDataSource dataSource;
+    protected JdbcTemplate jdbcTemplate;
 
     public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
         this.baseConnectionParam = baseConnectionParam;
@@ -59,7 +63,8 @@ public class CommonDataSourceClient implements DataSourceClient {
     }
 
     protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
-        this.connection = buildConn(baseConnectionParam);
+        this.dataSource = JDBCDataSourceProvider.createJdbcDataSource(baseConnectionParam, dbType);
+        this.jdbcTemplate = new JdbcTemplate(dataSource);
     }
 
     protected void checkUser(BaseConnectionParam baseConnectionParam) {
@@ -68,20 +73,6 @@ public class CommonDataSourceClient implements DataSourceClient {
         }
     }
 
-    private Connection buildConn(BaseConnectionParam baseConnectionParam) {
-        Connection conn = null;
-        try {
-            Class.forName(baseConnectionParam.getDriverClassName());
-            conn = DriverManager.getConnection(baseConnectionParam.getJdbcUrl(), baseConnectionParam.getUser(),
-                    baseConnectionParam.getPassword());
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException("Driver load fail", e);
-        } catch (SQLException e) {
-            throw new RuntimeException("JDBC connect failed", e);
-        }
-        return conn;
-    }
-
     protected void setDefaultUsername(BaseConnectionParam baseConnectionParam) {
         baseConnectionParam.setUser(COMMON_USER);
     }
@@ -101,7 +92,7 @@ public class CommonDataSourceClient implements DataSourceClient {
         // Checking data source client
         Stopwatch stopwatch = Stopwatch.createStarted();
         try {
-            this.connection.prepareStatement(this.baseConnectionParam.getValidationQuery()).executeQuery();
+            this.jdbcTemplate.execute(this.baseConnectionParam.getValidationQuery());
         } catch (Exception e) {
             throw new RuntimeException("JDBC connect failed", e);
         } finally {
@@ -113,21 +104,20 @@ public class CommonDataSourceClient implements DataSourceClient {
     @Override
     public Connection getConnection() {
         try {
-            return connection.isClosed() ? buildConn(baseConnectionParam) : connection;
+            return this.dataSource.getConnection();
         } catch (SQLException e) {
-            throw new RuntimeException("get conn is fail", e);
+            log.error("get druidDataSource Connection fail SQLException: {}", e.getMessage(), e);
+            return null;
         }
     }
 
     @Override
     public void close() {
-        log.info("do close connection {}.", baseConnectionParam.getDatabase());
-        try {
-            connection.close();
-        } catch (SQLException e) {
-            log.info("colse connection fail");
-            throw new RuntimeException(e);
+        log.info("do close dataSource {}.", baseConnectionParam.getDatabase());
+        try (HikariDataSource closedDatasource = dataSource) {
+            // only close the resource
         }
+        this.jdbcTemplate = null;
     }
 
 }
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java
index 53af3946b8..cf7db2e3b2 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-azure-sql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/azuresql/AzureSQLDataSourceClient.java
@@ -25,9 +25,14 @@ import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
 import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
 
+import com.google.common.base.Stopwatch;
+
 @Slf4j
 public class AzureSQLDataSourceClient extends CommonDataSourceClient {
 
@@ -44,4 +49,33 @@ public class AzureSQLDataSourceClient extends CommonDataSourceClient {
         return AzureSQLDataSourceProcessor.tokenGetConnection(connectionParam);
     }
 
+    @Override
+    public void checkClient() {
+
+        AzureSQLConnectionParam connectionParam = (AzureSQLConnectionParam) this.baseConnectionParam;
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        String validationQuery = this.baseConnectionParam.getValidationQuery();
+        if (!connectionParam.getMode().equals(AzureSQLAuthMode.ACCESSTOKEN)) {
+            // Checking data source client
+            try {
+                this.jdbcTemplate.execute(validationQuery);
+            } catch (Exception e) {
+                throw new RuntimeException("JDBC connect failed", e);
+            } finally {
+                log.info("Time to execute check jdbc client with sql {} for {} ms ",
+                        this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
+            }
+        } else {
+            try (Statement statement = getConnection().createStatement()) {
+                if (!statement.execute(validationQuery)) {
+                    throw new SQLException("execute check azure sql token client failed : " + validationQuery);
+                }
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            } finally {
+                log.info("Time to execute check azure sql token client with sql {} for {} ms ",
+                        this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
+            }
+        }
+    }
 }
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
index 3c28551e44..15270f60a3 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
@@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SEC
 
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
+import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
 import org.apache.dolphinscheduler.plugin.datasource.hive.security.UserGroupInformationFactory;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -32,9 +33,13 @@ import sun.security.krb5.Config;
 import org.apache.commons.lang3.StringUtils;
 
 import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.SQLException;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.springframework.jdbc.core.JdbcTemplate;
+
 @Slf4j
 public class HiveDataSourceClient extends CommonDataSourceClient {
 
@@ -47,6 +52,17 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
         log.info("PreInit in {}", getClass().getName());
     }
 
+    @Override
+    protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
+        log.info("Create UserGroupInformation.");
+        UserGroupInformationFactory.login(baseConnectionParam.getUser());
+        log.info("Create ugi success.");
+
+        this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
+        this.jdbcTemplate = new JdbcTemplate(dataSource);
+        log.info("Init {} success.", getClass().getName());
+    }
+
     @Override
     protected void checkEnv(BaseConnectionParam baseConnectionParam) {
         super.checkEnv(baseConnectionParam);
@@ -70,6 +86,20 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
         }
     }
 
+    @Override
+    public Connection getConnection() {
+        Connection connection = null;
+        while (connection == null) {
+            try {
+                connection = dataSource.getConnection();
+            } catch (SQLException e) {
+                UserGroupInformationFactory.logout(baseConnectionParam.getUser());
+                UserGroupInformationFactory.login(baseConnectionParam.getUser());
+            }
+        }
+        return connection;
+    }
+
     @Override
     public void close() {
         try {
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java
index 2d3954fff3..3e0af69577 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/main/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClient.java
@@ -18,11 +18,17 @@
 package org.apache.dolphinscheduler.plugin.datasource.kyuubi;
 
 import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
+import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
+import java.sql.Connection;
+import java.sql.SQLException;
+
 import lombok.extern.slf4j.Slf4j;
 
+import org.springframework.jdbc.core.JdbcTemplate;
+
 @Slf4j
 public class KyuubiDataSourceClient extends CommonDataSourceClient {
 
@@ -35,11 +41,32 @@ public class KyuubiDataSourceClient extends CommonDataSourceClient {
         log.info("PreInit in {}", getClass().getName());
     }
 
+    @Override
+    protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
+
+        this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
+        this.jdbcTemplate = new JdbcTemplate(dataSource);
+        log.info("Init {} success.", getClass().getName());
+    }
+
     @Override
     protected void checkEnv(BaseConnectionParam baseConnectionParam) {
         super.checkEnv(baseConnectionParam);
     }
 
+    @Override
+    public Connection getConnection() {
+        Connection connection = null;
+        while (connection == null) {
+            try {
+                connection = dataSource.getConnection();
+            } catch (SQLException e) {
+                log.error("Failed to get Kyuubi Connection.", e);
+            }
+        }
+        return connection;
+    }
+
     @Override
     public void close() {
         super.close();
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java
index 03b6f44c24..041420cc48 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-kyuubi/src/test/java/org/apache/dolphinscheduler/plugin/datasource/kyuubi/KyuubiDataSourceClientTest.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.plugin.datasource.kyuubi;
 
 import org.apache.dolphinscheduler.plugin.datasource.kyuubi.param.KyuubiConnectionParam;
+import org.apache.dolphinscheduler.spi.enums.DbType;
 
 import java.sql.Connection;
 
@@ -48,6 +49,13 @@ public class KyuubiDataSourceClientTest {
         Mockito.verify(kyuubiDataSourceClient).checkEnv(kyuubiConnectionParam);
     }
 
+    @Test
+    public void testInitClient() {
+        KyuubiConnectionParam kyuubiConnectionParam = new KyuubiConnectionParam();
+        kyuubiDataSourceClient.initClient(kyuubiConnectionParam, DbType.KYUUBI);
+        Mockito.verify(kyuubiDataSourceClient).initClient(kyuubiConnectionParam, DbType.KYUUBI);
+    }
+
     @Test
     public void testCheckClient() {
         kyuubiDataSourceClient.checkClient();
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java
index 74aeb30ee2..186e5afd19 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/RedshiftDataSourceClient.java
@@ -18,11 +18,21 @@
 package org.apache.dolphinscheduler.plugin.datasource.redshift;
 
 import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
+import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftAuthMode;
+import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftConnectionParam;
+import org.apache.dolphinscheduler.plugin.datasource.redshift.param.RedshiftDataSourceProcessor;
 import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
 import org.apache.dolphinscheduler.spi.enums.DbType;
 
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+
 import lombok.extern.slf4j.Slf4j;
 
+import com.google.common.base.Stopwatch;
+
 @Slf4j
 public class RedshiftDataSourceClient extends CommonDataSourceClient {
 
@@ -30,4 +40,41 @@ public class RedshiftDataSourceClient extends CommonDataSourceClient {
         super(baseConnectionParam, dbType);
     }
 
+    @Override
+    public Connection getConnection() {
+        RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam;
+        if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) {
+            return super.getConnection();
+        }
+        return RedshiftDataSourceProcessor.getConnectionByIAM(connectionParam);
+    }
+
+    @Override
+    public void checkClient() {
+        RedshiftConnectionParam connectionParam = (RedshiftConnectionParam) this.baseConnectionParam;
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        String validationQuery = this.baseConnectionParam.getValidationQuery();
+        if (connectionParam.getMode().equals(RedshiftAuthMode.PASSWORD)) {
+            // Checking data source client
+            try {
+                this.jdbcTemplate.execute(validationQuery);
+            } catch (Exception e) {
+                throw new RuntimeException("JDBC connect failed", e);
+            } finally {
+                log.info("Time to execute check jdbc client with sql {} for {} ms ",
+                        this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
+            }
+        } else {
+            try (Statement statement = getConnection().createStatement()) {
+                if (!statement.execute(validationQuery)) {
+                    throw new SQLException("execute check redshift access key failed : " + validationQuery);
+                }
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            } finally {
+                log.info("Time to execute check redshift access key with sql {} for {} ms ",
+                        this.baseConnectionParam.getValidationQuery(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
+            }
+        }
+    }
 }