You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/12/19 09:15:11 UTC
[dolphinscheduler] branch dev updated: Remove datasource in procedure task (#13198)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b2336b0ce9 Remove datasource in procedure task (#13198)
b2336b0ce9 is described below
commit b2336b0ce9e3ebbe57a92969c57a613964e5ccf2
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Dec 19 03:14:59 2022 -0600
Remove datasource in procedure task (#13198)
---
.../api/plugin/DataSourceProcessorProvider.java | 7 +++
.../plugin/task/procedure/ProcedureTask.java | 71 ++++++----------------
2 files changed, 25 insertions(+), 53 deletions(-)
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
index 0b4994116a..6e21117c9f 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
@@ -18,9 +18,12 @@
package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.apache.dolphinscheduler.spi.enums.DbType;
import java.util.Map;
+import lombok.NonNull;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +46,10 @@ public class DataSourceProcessorProvider {
return DataSourceClientProviderHolder.INSTANCE;
}
+ public DataSourceProcessor getDataSourceProcessor(@NonNull DbType dbType) {
+ return dataSourcePluginManager.getDataSourceProcessorMap().get(dbType.name());
+ }
+
public Map<String, DataSourceProcessor> getDataSourceProcessorMap() {
return dataSourcePluginManager.getDataSourceProcessorMap();
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index 68b6c29e09..688374ba62 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -21,8 +21,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
-import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@@ -40,7 +40,6 @@ import org.apache.commons.lang3.StringUtils;
import java.sql.CallableStatement;
import java.sql.Connection;
-import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
@@ -96,19 +95,12 @@ public class ProcedureTask extends AbstractTask {
procedureParameters.getMethod(),
procedureParameters.getLocalParams());
- Connection connection = null;
- CallableStatement stmt = null;
- try {
- // load class
- DbType dbType = DbType.valueOf(procedureParameters.getType());
- // get datasource
- ConnectionParam connectionParam =
- DataSourceUtils.buildConnectionParams(DbType.valueOf(procedureParameters.getType()),
- procedureTaskExecutionContext.getConnectionParams());
-
- // get jdbc connection
- connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
-
+ DbType dbType = DbType.valueOf(procedureParameters.getType());
+ DataSourceProcessor dataSourceProcessor =
+ DataSourceProcessorProvider.getInstance().getDataSourceProcessor(dbType);
+ ConnectionParam connectionParams =
+ dataSourceProcessor.createConnectionParams(procedureTaskExecutionContext.getConnectionParams());
+ try (Connection connection = dataSourceProcessor.getConnection(connectionParams)) {
Map<Integer, Property> sqlParamsMap = new HashMap<>();
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap() == null ? Maps.newHashMap()
: taskExecutionContext.getPrepareParamsMap();
@@ -116,30 +108,26 @@ public class ProcedureTask extends AbstractTask {
// set out params before format sql
paramsMap.putAll(procedureParameters.getOutProperty());
}
-
- // format sql
String proceduerSql = formatSql(sqlParamsMap, paramsMap);
// call method
- stmt = connection.prepareCall(proceduerSql);
-
- // set timeout
- setTimeout(stmt);
+ try (CallableStatement stmt = connection.prepareCall(proceduerSql)) {
+ // set timeout
+ setTimeout(stmt);
- // outParameterMap
- Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap);
+ // outParameterMap
+ Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap);
- stmt.executeUpdate();
+ stmt.executeUpdate();
- // print the output parameters to the log
- printOutParameter(stmt, outParameterMap);
+ // print the output parameters to the log
+ printOutParameter(stmt, outParameterMap);
- setExitStatusCode(EXIT_CODE_SUCCESS);
+ setExitStatusCode(EXIT_CODE_SUCCESS);
+ }
} catch (Exception e) {
setExitStatusCode(EXIT_CODE_FAILURE);
logger.error("procedure task error", e);
throw new TaskException("Execute procedure task failed", e);
- } finally {
- close(stmt, connection);
}
}
@@ -220,29 +208,6 @@ public class ProcedureTask extends AbstractTask {
}
}
- /**
- * close jdbc resource
- *
- * @param stmt stmt
- * @param connection connection
- */
- private void close(PreparedStatement stmt, Connection connection) {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException e) {
- logger.error("close prepared statement error : {}", e.getMessage(), e);
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- logger.error("close connection error : {}", e.getMessage(), e);
- }
- }
- }
-
/**
* get output parameter
*