You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/12/28 14:33:28 UTC

[dolphinscheduler] branch 2.0.2-prepare updated: [cherry-pick][FIX-#6727] Fix procedure params bug (#7680) (#7692)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
     new bcef5f5  [cherry-pick][FIX-#6727] Fix procedure params bug (#7680) (#7692)
bcef5f5 is described below

commit bcef5f55888ce106ea7f424dfcda37c0342d9c55
Author: BaoLiang <29...@users.noreply.github.com>
AuthorDate: Tue Dec 28 22:33:21 2021 +0800

    [cherry-pick][FIX-#6727] Fix procedure params bug (#7680) (#7692)
    
    * [FIX-#6727][worker-server] Fix procedure params bug (#7680)
    
    * fix Procedure param error
    
    * code style
    
    Co-authored-by: wangxj <wangxj31>
    # Conflicts:
    #	dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
    #	dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
    
    * update
    
    Co-authored-by: wangxj3 <85...@qq.com>
---
 .../plugin/task/api/AbstractTaskExecutor.java      | 36 +++++++++++
 .../plugin/task/procedure/ProcedureParameters.java | 35 +++++++++++
 .../plugin/task/procedure/ProcedureTask.java       | 69 ++++++++++++----------
 .../dolphinscheduler/plugin/task/sql/SqlTask.java  | 32 +---------
 4 files changed, 109 insertions(+), 63 deletions(-)

diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
index 5b5661c..22fbc6b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
@@ -19,10 +19,14 @@ package org.apache.dolphinscheduler.plugin.task.api;
 
 import org.apache.dolphinscheduler.spi.task.AbstractTask;
 import org.apache.dolphinscheduler.spi.task.TaskConstants;
+import org.apache.dolphinscheduler.spi.task.Property;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 
+import java.util.Map;
 import java.util.StringJoiner;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +39,7 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
 
     protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
 
+    public String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
     /**
      * constructor
      *
@@ -61,4 +66,35 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
             logger.info(" -> {}", joiner);
         }
     }
+
+    /**
+     * regular expressions match the contents between two specified strings
+     *
+     * @param content content
+     * @param rgex rgex
+     * @param sqlParamsMap sql params map
+     * @param paramsPropsMap params props map
+     */
+    public void setSqlParamsMap(String content, String rgex, Map<Integer, Property> sqlParamsMap,
+                                Map<String, Property> paramsPropsMap,int taskInstanceId) {
+        Pattern pattern = Pattern.compile(rgex);
+        Matcher m = pattern.matcher(content);
+        int index = 1;
+        while (m.find()) {
+
+            String paramName = m.group(1);
+            Property prop = paramsPropsMap.get(paramName);
+
+            if (prop == null) {
+                logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
+                    + " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskInstanceId);
+            } else {
+                sqlParamsMap.put(index, prop);
+                index++;
+                logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content);
+            }
+
+        }
+    }
+
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java
index bfb56c2..d0e35b9 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureParameters.java
@@ -17,12 +17,16 @@
 
 package org.apache.dolphinscheduler.plugin.task.procedure;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.spi.task.AbstractParameters;
+import org.apache.dolphinscheduler.spi.task.Property;
 import org.apache.dolphinscheduler.spi.task.ResourceInfo;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * procedure parameter
@@ -39,6 +43,8 @@ public class ProcedureParameters extends AbstractParameters {
      */
     private int datasource;
 
+    private Map<String, Property> outProperty;
+
     /**
      * procedure name
      */
@@ -86,4 +92,33 @@ public class ProcedureParameters extends AbstractParameters {
                 + ", method='" + method + '\''
                 + '}';
     }
+
+    public void dealOutParam4Procedure(Object result, String pop) {
+        Map<String, Property> properties = getOutProperty();
+        if (this.outProperty == null) {
+            return;
+        }
+        properties.get(pop).setValue(String.valueOf(result));
+        varPool.add(properties.get(pop));
+    }
+
+    public Map<String, Property> getOutProperty() {
+        if (this.outProperty != null) {
+            return this.outProperty;
+        }
+        if (CollectionUtils.isEmpty(localParams)) {
+            return null;
+        }
+        List<Property> outPropertyList = getOutProperty(localParams);
+        Map<String, Property> outProperty = new HashMap<>();
+        for (Property info : outPropertyList) {
+            outProperty.put(info.getProp(), info);
+        }
+        this.outProperty = outProperty;
+        return this.outProperty;
+    }
+
+    public void setOutProperty(Map<String, Property> outProperty) {
+        this.outProperty = outProperty;
+    }
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index 202b75d..35a95f1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -36,17 +36,15 @@ import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
-import org.apache.commons.collections4.CollectionUtils;
-
 import java.sql.CallableStatement;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Types;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+
 /**
  * procedure task
  */
@@ -102,18 +100,17 @@ public class ProcedureTask extends AbstractTaskExecutor {
 
             // get jdbc connection
             connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
-
-            // combining local and global parameters
-            Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
-
+            Map<Integer, Property> sqlParamsMap = new HashMap<>();
+            Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
+            String proceduerSql = formatSql(sqlParamsMap, paramsMap);
             // call method
-            stmt = connection.prepareCall(procedureParameters.getMethod());
+            stmt = connection.prepareCall(proceduerSql);
 
             // set timeout
             setTimeout(stmt);
 
             // outParameterMap
-            Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, paramsMap);
+            Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap);
 
             stmt.executeUpdate();
 
@@ -130,6 +127,12 @@ public class ProcedureTask extends AbstractTaskExecutor {
         }
     }
 
+    private String formatSql(Map<Integer, Property> sqlParamsMap, Map<String, Property> paramsMap) {
+        // combining local and global parameters
+        setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId());
+        return procedureParameters.getMethod().replaceAll(rgex, "?");
+    }
+
     /**
      * print outParameter
      *
@@ -145,7 +148,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
             String prop = property.getProp();
             DataType dataType = property.getType();
             // get output parameter
-            getOutputParameter(stmt, index, prop, dataType);
+            procedureParameters.dealOutParam4Procedure(getOutputParameter(stmt, index, prop, dataType), prop);
         }
     }
 
@@ -157,34 +160,25 @@ public class ProcedureTask extends AbstractTaskExecutor {
      * @return outParameterMap
      * @throws Exception Exception
      */
-    private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<String, Property> paramsMap) throws Exception {
+    private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<Integer, Property> paramsMap
+            , Map<String, Property> totalParamsMap) throws Exception {
         Map<Integer, Property> outParameterMap = new HashMap<>();
         if (procedureParameters.getLocalParametersMap() == null) {
             return outParameterMap;
         }
 
-        Collection<Property> userDefParamsList = procedureParameters.getLocalParametersMap().values();
-
-        if (CollectionUtils.isEmpty(userDefParamsList)) {
-            return outParameterMap;
-        }
-
         int index = 1;
-        for (Property property : userDefParamsList) {
-            logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
-                    , property.getProp(),
-                    property.getDirect(),
-                    property.getType(),
-                    property.getValue());
-            // set parameters
-            if (property.getDirect().equals(Direct.IN)) {
-                ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
-            } else if (property.getDirect().equals(Direct.OUT)) {
-                setOutParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
-                property.setValue(paramsMap.get(property.getProp()).getValue());
-                outParameterMap.put(index, property);
+        if (paramsMap != null) {
+            for (Map.Entry<Integer, Property> entry : paramsMap.entrySet()) {
+                Property property = entry.getValue();
+                if (property.getDirect().equals(Direct.IN)) {
+                    ParameterUtils.setInParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
+                } else if (property.getDirect().equals(Direct.OUT)) {
+                    setOutParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
+                    outParameterMap.put(index, property);
+                }
+                index++;
             }
-            index++;
         }
 
         return outParameterMap;
@@ -235,38 +229,49 @@ public class ProcedureTask extends AbstractTaskExecutor {
      * @param dataType dataType
      * @throws SQLException SQLException
      */
-    private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
+    private Object getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
+        Object value = null;
         switch (dataType) {
             case VARCHAR:
                 logger.info("out prameter varchar key : {} , value : {}", prop, stmt.getString(index));
+                value = stmt.getString(index);
                 break;
             case INTEGER:
                 logger.info("out prameter integer key : {} , value : {}", prop, stmt.getInt(index));
+                value = stmt.getInt(index);
                 break;
             case LONG:
                 logger.info("out prameter long key : {} , value : {}", prop, stmt.getLong(index));
+                value = stmt.getLong(index);
                 break;
             case FLOAT:
                 logger.info("out prameter float key : {} , value : {}", prop, stmt.getFloat(index));
+                value = stmt.getFloat(index);
                 break;
             case DOUBLE:
                 logger.info("out prameter double key : {} , value : {}", prop, stmt.getDouble(index));
+                value = stmt.getDouble(index);
                 break;
             case DATE:
                 logger.info("out prameter date key : {} , value : {}", prop, stmt.getDate(index));
+                value = stmt.getDate(index);
                 break;
             case TIME:
                 logger.info("out prameter time key : {} , value : {}", prop, stmt.getTime(index));
+                value = stmt.getTime(index);
                 break;
             case TIMESTAMP:
                 logger.info("out prameter timestamp key : {} , value : {}", prop, stmt.getTimestamp(index));
+                value = stmt.getTimestamp(index);
                 break;
             case BOOLEAN:
                 logger.info("out prameter boolean key : {} , value : {}", prop, stmt.getBoolean(index));
+                value = stmt.getBoolean(index);
                 break;
             default:
                 break;
         }
+        return value;
     }
 
     @Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index d8bae7a..a260130 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -406,35 +406,6 @@ public class SqlTask extends AbstractTaskExecutor {
     }
 
     /**
-     * regular expressions match the contents between two specified strings
-     *
-     * @param content content
-     * @param rgex rgex
-     * @param sqlParamsMap sql params map
-     * @param paramsPropsMap params props map
-     */
-    public void setSqlParamsMap(String content, String rgex, Map<Integer, Property> sqlParamsMap, Map<String, Property> paramsPropsMap) {
-        Pattern pattern = Pattern.compile(rgex);
-        Matcher m = pattern.matcher(content);
-        int index = 1;
-        while (m.find()) {
-
-            String paramName = m.group(1);
-            Property prop = paramsPropsMap.get(paramName);
-
-            if (prop == null) {
-                logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
-                        + " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskExecutionContext.getTaskInstanceId());
-            } else {
-                sqlParamsMap.put(index, prop);
-                index++;
-                logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content);
-            }
-
-        }
-    }
-
-    /**
      * print replace sql
      *
      * @param content content
@@ -485,8 +456,7 @@ public class SqlTask extends AbstractTaskExecutor {
         //replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job
         sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime());
         // special characters need to be escaped, ${} needs to be escaped
-        String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
-        setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
+        setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap,taskExecutionContext.getTaskInstanceId());
         //Replace the original value in sql !{...} ,Does not participate in precompilation
         String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*";
         sql = replaceOriginalValue(sql, rgexo, paramsMap);