You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/12/19 09:15:11 UTC

[dolphinscheduler] branch dev updated: Remove datasource in procedure task (#13198)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b2336b0ce9 Remove datasource in procedure task (#13198)
b2336b0ce9 is described below

commit b2336b0ce9e3ebbe57a92969c57a613964e5ccf2
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Dec 19 03:14:59 2022 -0600

    Remove datasource in procedure task (#13198)
---
 .../api/plugin/DataSourceProcessorProvider.java    |  7 +++
 .../plugin/task/procedure/ProcedureTask.java       | 71 ++++++----------------
 2 files changed, 25 insertions(+), 53 deletions(-)

diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
index 0b4994116a..6e21117c9f 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
@@ -18,9 +18,12 @@
 package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
 
 import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.apache.dolphinscheduler.spi.enums.DbType;
 
 import java.util.Map;
 
+import lombok.NonNull;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +46,10 @@ public class DataSourceProcessorProvider {
         return DataSourceClientProviderHolder.INSTANCE;
     }
 
+    public DataSourceProcessor getDataSourceProcessor(@NonNull DbType dbType) {
+        return dataSourcePluginManager.getDataSourceProcessorMap().get(dbType.name());
+    }
+
     public Map<String, DataSourceProcessor> getDataSourceProcessorMap() {
         return dataSourcePluginManager.getDataSourceProcessorMap();
     }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index 68b6c29e09..688374ba62 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -21,8 +21,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
 import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
 
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
-import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@@ -40,7 +40,6 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.sql.CallableStatement;
 import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.HashMap;
@@ -96,19 +95,12 @@ public class ProcedureTask extends AbstractTask {
                 procedureParameters.getMethod(),
                 procedureParameters.getLocalParams());
 
-        Connection connection = null;
-        CallableStatement stmt = null;
-        try {
-            // load class
-            DbType dbType = DbType.valueOf(procedureParameters.getType());
-            // get datasource
-            ConnectionParam connectionParam =
-                    DataSourceUtils.buildConnectionParams(DbType.valueOf(procedureParameters.getType()),
-                            procedureTaskExecutionContext.getConnectionParams());
-
-            // get jdbc connection
-            connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
-
+        DbType dbType = DbType.valueOf(procedureParameters.getType());
+        DataSourceProcessor dataSourceProcessor =
+                DataSourceProcessorProvider.getInstance().getDataSourceProcessor(dbType);
+        ConnectionParam connectionParams =
+                dataSourceProcessor.createConnectionParams(procedureTaskExecutionContext.getConnectionParams());
+        try (Connection connection = dataSourceProcessor.getConnection(connectionParams)) {
             Map<Integer, Property> sqlParamsMap = new HashMap<>();
             Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap() == null ? Maps.newHashMap()
                     : taskExecutionContext.getPrepareParamsMap();
@@ -116,30 +108,26 @@ public class ProcedureTask extends AbstractTask {
                 // set out params before format sql
                 paramsMap.putAll(procedureParameters.getOutProperty());
             }
-
-            // format sql
             String proceduerSql = formatSql(sqlParamsMap, paramsMap);
             // call method
-            stmt = connection.prepareCall(proceduerSql);
-
-            // set timeout
-            setTimeout(stmt);
+            try (CallableStatement stmt = connection.prepareCall(proceduerSql)) {
+                // set timeout
+                setTimeout(stmt);
 
-            // outParameterMap
-            Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap);
+                // outParameterMap
+                Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap);
 
-            stmt.executeUpdate();
+                stmt.executeUpdate();
 
-            // print the output parameters to the log
-            printOutParameter(stmt, outParameterMap);
+                // print the output parameters to the log
+                printOutParameter(stmt, outParameterMap);
 
-            setExitStatusCode(EXIT_CODE_SUCCESS);
+                setExitStatusCode(EXIT_CODE_SUCCESS);
+            }
         } catch (Exception e) {
             setExitStatusCode(EXIT_CODE_FAILURE);
             logger.error("procedure task error", e);
             throw new TaskException("Execute procedure task failed", e);
-        } finally {
-            close(stmt, connection);
         }
     }
 
@@ -220,29 +208,6 @@ public class ProcedureTask extends AbstractTask {
         }
     }
 
-    /**
-     * close jdbc resource
-     *
-     * @param stmt       stmt
-     * @param connection connection
-     */
-    private void close(PreparedStatement stmt, Connection connection) {
-        if (stmt != null) {
-            try {
-                stmt.close();
-            } catch (SQLException e) {
-                logger.error("close prepared statement error : {}", e.getMessage(), e);
-            }
-        }
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (SQLException e) {
-                logger.error("close connection error : {}", e.getMessage(), e);
-            }
-        }
-    }
-
     /**
      * get output parameter
      *