You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/12/25 05:35:45 UTC

[dolphinscheduler] branch 2.0.8-prepare updated: [Bug-12960][Master] Fix that start parameters cannot update global variables (#13005)

This is an automated email from the ASF dual-hosted git repository.

jinyleechina pushed a commit to branch 2.0.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.8-prepare by this push:
     new 4515d3191f [Bug-12960][Master] Fix that start parameters cannot update global variables (#13005)
4515d3191f is described below

commit 4515d3191fbb018bb215f6eca3c3a7d48f9f1d37
Author: Aaron Wang <wa...@gmail.com>
AuthorDate: Sun Dec 25 13:35:33 2022 +0800

    [Bug-12960][Master] Fix that start parameters cannot update global variables (#13005)
    
    * fix start params override global params bug
    
    * set startup parameters the highest priority
    
    * add global parameter prefix
    
    * fix import
---
 .../apache/dolphinscheduler/common/Constants.java  |  6 +++
 .../master/runner/WorkflowExecuteThread.java       | 43 +++++++++++++++++++++-
 .../service/process/ProcessService.java            | 28 ++++++++++----
 .../spi/task/paramparser/ParamUtils.java           | 31 +++++++++++++---
 .../spi/task/paramparser/PlaceholderUtils.java     |  7 +++-
 .../dolphinscheduler/spi/utils/Constants.java      |  5 +++
 .../plugin/task/shell/ShellTask.java               |  2 +-
 7 files changed, 105 insertions(+), 17 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 1f6558a0db..70fab13f76 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -935,6 +935,12 @@ public final class Constants {
      */
     public static final String LOCALE_LANGUAGE = "language";
 
+    /**
+     * temporary parameter prefix
+     */
+    public static final String START_UP_PARAMS_PREFIX = "startup-";
+    public static final String GLOBAL_PARAMS_PREFIX = "global-";
+
     /**
      * driver
      */
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 8a0f5f2986..8f96c20c7d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -23,11 +23,15 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_ST
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
 import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
 import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
+import static org.apache.dolphinscheduler.common.Constants.START_UP_PARAMS_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS_PREFIX;
+import static org.apache.dolphinscheduler.common.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.common.enums.Direct.IN;
 
+import org.apache.commons.collections4.MapUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.DependResult;
-import org.apache.dolphinscheduler.common.enums.Direct;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Flag;
@@ -688,6 +692,9 @@ public class WorkflowExecuteThread implements Runnable {
         if (processInstance.isComplementData() && complementListDate.size() == 0) {
             Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
             if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+                // reset global params while there are start parameters
+                setGlobalParamIfCommanded(processDefinition, cmdParam);
+
                 Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
                 Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
                 List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
@@ -944,7 +951,7 @@ public class WorkflowExecuteThread implements Runnable {
 
     private void setVarPoolValue(Map<String, Property> allProperty, Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) {
         //for this taskInstance all the param in this part is IN.
-        thisProperty.setDirect(Direct.IN);
+        thisProperty.setDirect(IN);
         //get the pre taskInstance Property's name
         String proName = thisProperty.getProp();
         //if the Previous nodes have the Property of same name
@@ -1629,4 +1636,36 @@ public class WorkflowExecuteThread implements Runnable {
     public Map<Integer, ITaskProcessor> getActiveTaskProcessorMaps() {
         return activeTaskProcessorMaps;
     }
+
+    private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
+        // get start params from command param
+        Map<String, String> startParamMap = new HashMap<>();
+        if (cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
+            String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
+            startParamMap = JSONUtils.toMap(startParamJson);
+        }
+        Map<String, String> fatherParamMap = new HashMap<>();
+        if (cmdParam.containsKey(Constants.CMD_PARAM_FATHER_PARAMS)) {
+            String fatherParamJson = cmdParam.get(Constants.CMD_PARAM_FATHER_PARAMS);
+            fatherParamMap = JSONUtils.toMap(fatherParamJson);
+        }
+        startParamMap.putAll(fatherParamMap);
+        Map<String, String> globalMap = processDefinition.getGlobalParamMap();
+        List<Property> globalParamList = processDefinition.getGlobalParamList();
+        if (MapUtils.isNotEmpty(startParamMap) && globalMap != null) {
+            Map<String, String> tempGlobalMap = new HashMap<>();
+            // add prefix for global params
+            for (Map.Entry<String, String> param : globalMap.entrySet()) {
+                tempGlobalMap.put(GLOBAL_PARAMS_PREFIX+ param.getKey(), param.getValue());
+            }
+            globalParamList.forEach(property -> property.setProp(GLOBAL_PARAMS_PREFIX + property.getProp()));
+            // set start param into global params, add prefix for startup params
+            for (Entry<String, String> startParam : startParamMap.entrySet()) {
+                String tmpStartParamKey = START_UP_PARAMS_PREFIX + startParam.getKey();
+                tempGlobalMap.put(tmpStartParamKey, startParam.getValue());
+                globalParamList.add(new Property(tmpStartParamKey, IN, VARCHAR, startParam.getValue()));
+            }
+            processDefinition.setGlobalParamMap(tempGlobalMap);
+        }
+    }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index b79e31bee1..1d017eb4db 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -26,9 +26,14 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
 import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.START_UP_PARAMS_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS_PREFIX;
 
 import static java.util.stream.Collectors.toSet;
+import static org.apache.dolphinscheduler.common.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.common.enums.Direct.IN;
 
+import org.apache.commons.collections4.MapUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -653,15 +658,22 @@ public class ProcessService {
             fatherParamMap = JSONUtils.toMap(fatherParamJson);
         }
         startParamMap.putAll(fatherParamMap);
-        // set start param into global params
-        if (startParamMap.size() > 0
-                && processDefinition.getGlobalParamMap() != null) {
-            for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
-                String val = startParamMap.get(param.getKey());
-                if (val != null) {
-                    param.setValue(val);
-                }
+        Map<String, String> globalMap = processDefinition.getGlobalParamMap();
+        List<Property> globalParamList = processDefinition.getGlobalParamList();
+        if (MapUtils.isNotEmpty(startParamMap) && globalMap != null) {
+            Map<String, String> tempGlobalMap = new HashMap<>();
+            // add prefix for global params
+            for (Map.Entry<String, String> param : globalMap.entrySet()) {
+                tempGlobalMap.put(GLOBAL_PARAMS_PREFIX + param.getKey(), param.getValue());
+            }
+            globalParamList.forEach(property -> property.setProp(GLOBAL_PARAMS_PREFIX + property.getProp()));
+            // set start param into global params, add prefix for startup params
+            for (Entry<String, String> startParam : startParamMap.entrySet()) {
+                String tmpStartParamKey = START_UP_PARAMS_PREFIX + startParam.getKey();
+                tempGlobalMap.put(tmpStartParamKey, startParam.getValue());
+                globalParamList.add(new Property(tmpStartParamKey, IN, VARCHAR, startParam.getValue()));
             }
+            processDefinition.setGlobalParamMap(tempGlobalMap);
         }
     }
 
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java
index 069f941ff8..a96fd29883 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java
@@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.spi.task.paramparser;
 
 import static org.apache.dolphinscheduler.spi.task.TaskConstants.PARAMETER_TASK_EXECUTE_PATH;
 import static org.apache.dolphinscheduler.spi.task.TaskConstants.PARAMETER_TASK_INSTANCE_ID;
-
+import static org.apache.dolphinscheduler.spi.utils.Constants.GLOBAL_PARAMS_PREFIX;
+import static org.apache.dolphinscheduler.spi.utils.Constants.START_UP_PARAMS_PREFIX;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.dolphinscheduler.spi.enums.CommandType;
 import org.apache.dolphinscheduler.spi.enums.DataType;
 import org.apache.dolphinscheduler.spi.task.AbstractParameters;
@@ -60,12 +62,14 @@ public class ParamUtils {
         CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
         Date scheduleTime = taskExecutionContext.getScheduleTime();
 
+        Map<String, Property> convertedParams = new HashMap<>();
+
         // combining local and global parameters
         Map<String, Property> localParams = parameters.getLocalParametersMap();
 
         Map<String, Property> varParams = parameters.getVarPoolMap();
 
-        if (globalParams == null && localParams == null) {
+        if (MapUtils.isEmpty(globalParams) && MapUtils.isEmpty(localParams)) {
             return null;
         }
         // if it is a complement,
@@ -75,8 +79,7 @@ public class ParamUtils {
                 .getBusinessTime(commandType,
                         scheduleTime);
 
-        if (globalParamsMap != null) {
-
+        if (MapUtils.isNotEmpty(globalParamsMap)) {
             params.putAll(globalParamsMap);
         }
 
@@ -87,12 +90,19 @@ public class ParamUtils {
 
         if (globalParams != null && localParams != null) {
             globalParams.putAll(localParams);
+            for (Map.Entry<String, Property> entry : localParams.entrySet()) {
+                convertedParams.put(entry.getKey(), entry.getValue());
+            }
         } else if (globalParams == null && localParams != null) {
             globalParams = localParams;
+            convertedParams = localParams;
         }
         if (varParams != null) {
             varParams.putAll(globalParams);
             globalParams = varParams;
+            for (Map.Entry<String, Property> entry : varParams.entrySet()) {
+                convertedParams.put(entry.getKey(), entry.getValue());
+            }
         }
         Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
         while (iter.hasNext()) {
@@ -111,9 +121,20 @@ public class ParamUtils {
                 val  = ParameterUtils.convertParameterPlaceholders(val, params);
                 property.setValue(val);
             }
+
+            if (property.getProp().startsWith(START_UP_PARAMS_PREFIX)) {
+                property.setProp(property.getProp().replaceFirst(START_UP_PARAMS_PREFIX, ""));
+                convertedParams.put(property.getProp(), property);
+            } else if (property.getProp().startsWith(GLOBAL_PARAMS_PREFIX)) {
+                String prop = property.getProp().replaceFirst(GLOBAL_PARAMS_PREFIX, "");
+                if (!convertedParams.containsKey(prop)) {
+                    property.setProp(prop);
+                    convertedParams.put(prop, property);
+                }
+            }
         }
 
-        return globalParams;
+        return convertedParams;
     }
 
     /**
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java
index 90ee18311a..89ca56faf9 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java
@@ -17,6 +17,9 @@
 
 package org.apache.dolphinscheduler.spi.task.paramparser;
 
+import static org.apache.dolphinscheduler.spi.utils.Constants.GLOBAL_PARAMS_PREFIX;
+import static org.apache.dolphinscheduler.spi.utils.Constants.START_UP_PARAMS_PREFIX;
+
 import java.util.Map;
 
 import org.slf4j.Logger;
@@ -92,7 +95,9 @@ public class PlaceholderUtils {
         @Override
         public String resolvePlaceholder(String placeholderName) {
             try {
-                return paramsMap.get(placeholderName);
+                String startUpPlaceholderName = START_UP_PARAMS_PREFIX + placeholderName;
+                String globalPlaceholderName = GLOBAL_PARAMS_PREFIX + placeholderName;
+                return paramsMap.getOrDefault(startUpPlaceholderName, paramsMap.getOrDefault(placeholderName, paramsMap.getOrDefault(globalPlaceholderName, null)));
             } catch (Exception ex) {
                 logger.error("resolve placeholder '{}' in [ {} ]", placeholderName, value, ex);
                 return null;
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java
index abe0672241..33e34b37f8 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java
@@ -136,6 +136,11 @@ public class Constants {
     public static final String COM_DB2_JDBC_DRIVER = "com.ibm.db2.jcc.DB2Driver";
     public static final String COM_PRESTO_JDBC_DRIVER = "com.facebook.presto.jdbc.PrestoDriver";
 
+    /**
+     * temporary parameter prefix
+     */
+    public static final String START_UP_PARAMS_PREFIX = "startup-";
+    public static final String GLOBAL_PARAMS_PREFIX = "global-";
 
     /**
      * validation Query
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index 5d9fb80ce1..43859e8da1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -158,7 +158,7 @@ public class ShellTask extends AbstractTaskExecutor {
 
     private String parseScript(String script) {
         // combining local and global parameters
-        Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+        Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
         if (MapUtils.isEmpty(paramsMap)) {
             paramsMap = new HashMap<>();
         }