You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/12/28 14:33:28 UTC
[dolphinscheduler] branch 2.0.2-prepare updated: [cherry-pick][FIX-#6727] Fix procedure params bug (#7680) (#7692)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
new bcef5f5 [cherry-pick][FIX-#6727] Fix procedure params bug (#7680) (#7692)
bcef5f5 is described below
commit bcef5f55888ce106ea7f424dfcda37c0342d9c55
Author: BaoLiang <29...@users.noreply.github.com>
AuthorDate: Tue Dec 28 22:33:21 2021 +0800
[cherry-pick][FIX-#6727] Fix procedure params bug (#7680) (#7692)
* [FIX-#6727][worker-server] Fix procedure params bug (#7680)
* fix Procedure param error
* code style
Co-authored-by: wangxj <wangxj31>
# Conflicts:
# dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
# dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
* update
Co-authored-by: wangxj3 <85...@qq.com>
---
.../plugin/task/api/AbstractTaskExecutor.java | 36 +++++++++++
.../plugin/task/procedure/ProcedureParameters.java | 35 +++++++++++
.../plugin/task/procedure/ProcedureTask.java | 69 ++++++++++++----------
.../dolphinscheduler/plugin/task/sql/SqlTask.java | 32 +---------
4 files changed, 109 insertions(+), 63 deletions(-)
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
index 5b5661c..22fbc6b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
@@ -19,10 +19,14 @@ package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
+import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
+import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +39,7 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
+ public String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
/**
* constructor
*
@@ -61,4 +66,35 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
logger.info(" -> {}", joiner);
}
}
+
+ /**
+ * regular expressions match the contents between two specified strings
+ *
+ * @param content content
+ * @param rgex rgex
+ * @param sqlParamsMap sql params map
+ * @param paramsPropsMap params props map
+ */
+ public void setSqlParamsMap(String content, String rgex, Map<Integer, Property> sqlParamsMap,
+ Map<String, Property> paramsPropsMap,int taskInstanceId) {
+ Pattern pattern = Pattern.compile(rgex);
+ Matcher m = pattern.matcher(content);
+ int index = 1;
+ while (m.find()) {
+
+ String paramName = m.group(1);
+ Property prop = paramsPropsMap.get(paramName);
+
+ if (prop == null) {
+ logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
+ + " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskInstanceId);
+ } else {
+ sqlParamsMap.put(index, prop);
+ index++;
+ logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content);
+ }
+
+ }
+ }
+
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java
index bfb56c2..d0e35b9 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java
@@ -17,12 +17,16 @@
package org.apache.dolphinscheduler.plugin.task.procedure;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
+import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.ResourceInfo;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* procedure parameter
@@ -39,6 +43,8 @@ public class ProcedureParameters extends AbstractParameters {
*/
private int datasource;
+ private Map<String, Property> outProperty;
+
/**
* procedure name
*/
@@ -86,4 +92,33 @@ public class ProcedureParameters extends AbstractParameters {
+ ", method='" + method + '\''
+ '}';
}
+
+ public void dealOutParam4Procedure(Object result, String pop) {
+ Map<String, Property> properties = getOutProperty();
+ if (this.outProperty == null) {
+ return;
+ }
+ properties.get(pop).setValue(String.valueOf(result));
+ varPool.add(properties.get(pop));
+ }
+
+ public Map<String, Property> getOutProperty() {
+ if (this.outProperty != null) {
+ return this.outProperty;
+ }
+ if (CollectionUtils.isEmpty(localParams)) {
+ return null;
+ }
+ List<Property> outPropertyList = getOutProperty(localParams);
+ Map<String, Property> outProperty = new HashMap<>();
+ for (Property info : outPropertyList) {
+ outProperty.put(info.getProp(), info);
+ }
+ this.outProperty = outProperty;
+ return this.outProperty;
+ }
+
+ public void setOutProperty(Map<String, Property> outProperty) {
+ this.outProperty = outProperty;
+ }
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index 202b75d..35a95f1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -36,17 +36,15 @@ import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import org.apache.commons.collections4.CollectionUtils;
-
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+
/**
* procedure task
*/
@@ -102,18 +100,17 @@ public class ProcedureTask extends AbstractTaskExecutor {
// get jdbc connection
connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
-
- // combining local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
-
+ Map<Integer, Property> sqlParamsMap = new HashMap<>();
+ Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
+ String proceduerSql = formatSql(sqlParamsMap, paramsMap);
// call method
- stmt = connection.prepareCall(procedureParameters.getMethod());
+ stmt = connection.prepareCall(proceduerSql);
// set timeout
setTimeout(stmt);
// outParameterMap
- Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, paramsMap);
+ Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap);
stmt.executeUpdate();
@@ -130,6 +127,12 @@ public class ProcedureTask extends AbstractTaskExecutor {
}
}
+ private String formatSql(Map<Integer, Property> sqlParamsMap, Map<String, Property> paramsMap) {
+ // combining local and global parameters
+ setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId());
+ return procedureParameters.getMethod().replaceAll(rgex, "?");
+ }
+
/**
* print outParameter
*
@@ -145,7 +148,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
String prop = property.getProp();
DataType dataType = property.getType();
// get output parameter
- getOutputParameter(stmt, index, prop, dataType);
+ procedureParameters.dealOutParam4Procedure(getOutputParameter(stmt, index, prop, dataType), prop);
}
}
@@ -157,34 +160,25 @@ public class ProcedureTask extends AbstractTaskExecutor {
* @return outParameterMap
* @throws Exception Exception
*/
- private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<String, Property> paramsMap) throws Exception {
+ private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<Integer, Property> paramsMap
+ , Map<String, Property> totalParamsMap) throws Exception {
Map<Integer, Property> outParameterMap = new HashMap<>();
if (procedureParameters.getLocalParametersMap() == null) {
return outParameterMap;
}
- Collection<Property> userDefParamsList = procedureParameters.getLocalParametersMap().values();
-
- if (CollectionUtils.isEmpty(userDefParamsList)) {
- return outParameterMap;
- }
-
int index = 1;
- for (Property property : userDefParamsList) {
- logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
- , property.getProp(),
- property.getDirect(),
- property.getType(),
- property.getValue());
- // set parameters
- if (property.getDirect().equals(Direct.IN)) {
- ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
- } else if (property.getDirect().equals(Direct.OUT)) {
- setOutParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
- property.setValue(paramsMap.get(property.getProp()).getValue());
- outParameterMap.put(index, property);
+ if (paramsMap != null) {
+ for (Map.Entry<Integer, Property> entry : paramsMap.entrySet()) {
+ Property property = entry.getValue();
+ if (property.getDirect().equals(Direct.IN)) {
+ ParameterUtils.setInParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
+ } else if (property.getDirect().equals(Direct.OUT)) {
+ setOutParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
+ outParameterMap.put(index, property);
+ }
+ index++;
}
- index++;
}
return outParameterMap;
@@ -235,38 +229,49 @@ public class ProcedureTask extends AbstractTaskExecutor {
* @param dataType dataType
* @throws SQLException SQLException
*/
- private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
+ private Object getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
+ Object value = null;
switch (dataType) {
case VARCHAR:
logger.info("out prameter varchar key : {} , value : {}", prop, stmt.getString(index));
+ value = stmt.getString(index);
break;
case INTEGER:
logger.info("out prameter integer key : {} , value : {}", prop, stmt.getInt(index));
+ value = stmt.getInt(index);
break;
case LONG:
logger.info("out prameter long key : {} , value : {}", prop, stmt.getLong(index));
+ value = stmt.getLong(index);
break;
case FLOAT:
logger.info("out prameter float key : {} , value : {}", prop, stmt.getFloat(index));
+ value = stmt.getFloat(index);
break;
case DOUBLE:
logger.info("out prameter double key : {} , value : {}", prop, stmt.getDouble(index));
+ value = stmt.getDouble(index);
break;
case DATE:
logger.info("out prameter date key : {} , value : {}", prop, stmt.getDate(index));
+ value = stmt.getDate(index);
break;
case TIME:
logger.info("out prameter time key : {} , value : {}", prop, stmt.getTime(index));
+ value = stmt.getTime(index);
break;
case TIMESTAMP:
logger.info("out prameter timestamp key : {} , value : {}", prop, stmt.getTimestamp(index));
+ value = stmt.getTimestamp(index);
break;
case BOOLEAN:
logger.info("out prameter boolean key : {} , value : {}", prop, stmt.getBoolean(index));
+ value = stmt.getBoolean(index);
break;
default:
break;
}
+ return value;
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index d8bae7a..a260130 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -406,35 +406,6 @@ public class SqlTask extends AbstractTaskExecutor {
}
/**
- * regular expressions match the contents between two specified strings
- *
- * @param content content
- * @param rgex rgex
- * @param sqlParamsMap sql params map
- * @param paramsPropsMap params props map
- */
- public void setSqlParamsMap(String content, String rgex, Map<Integer, Property> sqlParamsMap, Map<String, Property> paramsPropsMap) {
- Pattern pattern = Pattern.compile(rgex);
- Matcher m = pattern.matcher(content);
- int index = 1;
- while (m.find()) {
-
- String paramName = m.group(1);
- Property prop = paramsPropsMap.get(paramName);
-
- if (prop == null) {
- logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
- + " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskExecutionContext.getTaskInstanceId());
- } else {
- sqlParamsMap.put(index, prop);
- index++;
- logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content);
- }
-
- }
- }
-
- /**
* print replace sql
*
* @param content content
@@ -485,8 +456,7 @@ public class SqlTask extends AbstractTaskExecutor {
//replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job
sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime());
// special characters need to be escaped, ${} needs to be escaped
- String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
- setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
+ setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap,taskExecutionContext.getTaskInstanceId());
//Replace the original value in sql !{...} ,Does not participate in precompilation
String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*";
sql = replaceOriginalValue(sql, rgexo, paramsMap);