You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/12/25 05:35:45 UTC
[dolphinscheduler] branch 2.0.8-prepare updated: [Bug-12960][Master] Fix that start parameters cannot update global variables (#13005)
This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch 2.0.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.8-prepare by this push:
new 4515d3191f [Bug-12960][Master] Fix that start parameters cannot update global variables (#13005)
4515d3191f is described below
commit 4515d3191fbb018bb215f6eca3c3a7d48f9f1d37
Author: Aaron Wang <wa...@gmail.com>
AuthorDate: Sun Dec 25 13:35:33 2022 +0800
[Bug-12960][Master] Fix that start parameters cannot update global variables (#13005)
* fix start params override global params bug
* set startup parameters the highest priority
* add global parameter prefix
* fix import
---
.../apache/dolphinscheduler/common/Constants.java | 6 +++
.../master/runner/WorkflowExecuteThread.java | 43 +++++++++++++++++++++-
.../service/process/ProcessService.java | 28 ++++++++++----
.../spi/task/paramparser/ParamUtils.java | 31 +++++++++++++---
.../spi/task/paramparser/PlaceholderUtils.java | 7 +++-
.../dolphinscheduler/spi/utils/Constants.java | 5 +++
.../plugin/task/shell/ShellTask.java | 2 +-
7 files changed, 105 insertions(+), 17 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 1f6558a0db..70fab13f76 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -935,6 +935,12 @@ public final class Constants {
*/
public static final String LOCALE_LANGUAGE = "language";
+ /**
+ * temporary parameter prefix
+ */
+ public static final String START_UP_PARAMS_PREFIX = "startup-";
+ public static final String GLOBAL_PARAMS_PREFIX = "global-";
+
/**
* driver
*/
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 8a0f5f2986..8f96c20c7d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -23,11 +23,15 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_ST
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
+import static org.apache.dolphinscheduler.common.Constants.START_UP_PARAMS_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS_PREFIX;
+import static org.apache.dolphinscheduler.common.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.common.enums.Direct.IN;
+import org.apache.commons.collections4.MapUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DependResult;
-import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
@@ -688,6 +692,9 @@ public class WorkflowExecuteThread implements Runnable {
if (processInstance.isComplementData() && complementListDate.size() == 0) {
Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+ // reset global params while there are start parameters
+ setGlobalParamIfCommanded(processDefinition, cmdParam);
+
Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
@@ -944,7 +951,7 @@ public class WorkflowExecuteThread implements Runnable {
private void setVarPoolValue(Map<String, Property> allProperty, Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) {
//for this taskInstance all the param in this part is IN.
- thisProperty.setDirect(Direct.IN);
+ thisProperty.setDirect(IN);
//get the pre taskInstance Property's name
String proName = thisProperty.getProp();
//if the Previous nodes have the Property of same name
@@ -1629,4 +1636,36 @@ public class WorkflowExecuteThread implements Runnable {
public Map<Integer, ITaskProcessor> getActiveTaskProcessorMaps() {
return activeTaskProcessorMaps;
}
+
+ private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
+ // get start params from command param
+ Map<String, String> startParamMap = new HashMap<>();
+ if (cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
+ String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
+ startParamMap = JSONUtils.toMap(startParamJson);
+ }
+ Map<String, String> fatherParamMap = new HashMap<>();
+ if (cmdParam.containsKey(Constants.CMD_PARAM_FATHER_PARAMS)) {
+ String fatherParamJson = cmdParam.get(Constants.CMD_PARAM_FATHER_PARAMS);
+ fatherParamMap = JSONUtils.toMap(fatherParamJson);
+ }
+ startParamMap.putAll(fatherParamMap);
+ Map<String, String> globalMap = processDefinition.getGlobalParamMap();
+ List<Property> globalParamList = processDefinition.getGlobalParamList();
+ if (MapUtils.isNotEmpty(startParamMap) && globalMap != null) {
+ Map<String, String> tempGlobalMap = new HashMap<>();
+ // add prefix for global params
+ for (Map.Entry<String, String> param : globalMap.entrySet()) {
+ tempGlobalMap.put(GLOBAL_PARAMS_PREFIX+ param.getKey(), param.getValue());
+ }
+ globalParamList.forEach(property -> property.setProp(GLOBAL_PARAMS_PREFIX + property.getProp()));
+ // set start param into global params, add prefix for startup params
+ for (Entry<String, String> startParam : startParamMap.entrySet()) {
+ String tmpStartParamKey = START_UP_PARAMS_PREFIX + startParam.getKey();
+ tempGlobalMap.put(tmpStartParamKey, startParam.getValue());
+ globalParamList.add(new Property(tmpStartParamKey, IN, VARCHAR, startParam.getValue()));
+ }
+ processDefinition.setGlobalParamMap(tempGlobalMap);
+ }
+ }
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index b79e31bee1..1d017eb4db 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -26,9 +26,14 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.START_UP_PARAMS_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS_PREFIX;
import static java.util.stream.Collectors.toSet;
+import static org.apache.dolphinscheduler.common.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.common.enums.Direct.IN;
+import org.apache.commons.collections4.MapUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -653,15 +658,22 @@ public class ProcessService {
fatherParamMap = JSONUtils.toMap(fatherParamJson);
}
startParamMap.putAll(fatherParamMap);
- // set start param into global params
- if (startParamMap.size() > 0
- && processDefinition.getGlobalParamMap() != null) {
- for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
- String val = startParamMap.get(param.getKey());
- if (val != null) {
- param.setValue(val);
- }
+ Map<String, String> globalMap = processDefinition.getGlobalParamMap();
+ List<Property> globalParamList = processDefinition.getGlobalParamList();
+ if (MapUtils.isNotEmpty(startParamMap) && globalMap != null) {
+ Map<String, String> tempGlobalMap = new HashMap<>();
+ // add prefix for global params
+ for (Map.Entry<String, String> param : globalMap.entrySet()) {
+ tempGlobalMap.put(GLOBAL_PARAMS_PREFIX + param.getKey(), param.getValue());
+ }
+ globalParamList.forEach(property -> property.setProp(GLOBAL_PARAMS_PREFIX + property.getProp()));
+ // set start param into global params, add prefix for startup params
+ for (Entry<String, String> startParam : startParamMap.entrySet()) {
+ String tmpStartParamKey = START_UP_PARAMS_PREFIX + startParam.getKey();
+ tempGlobalMap.put(tmpStartParamKey, startParam.getValue());
+ globalParamList.add(new Property(tmpStartParamKey, IN, VARCHAR, startParam.getValue()));
}
+ processDefinition.setGlobalParamMap(tempGlobalMap);
}
}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java
index 069f941ff8..a96fd29883 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/ParamUtils.java
@@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.spi.task.paramparser;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.PARAMETER_TASK_EXECUTE_PATH;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.PARAMETER_TASK_INSTANCE_ID;
-
+import static org.apache.dolphinscheduler.spi.utils.Constants.GLOBAL_PARAMS_PREFIX;
+import static org.apache.dolphinscheduler.spi.utils.Constants.START_UP_PARAMS_PREFIX;
+import org.apache.commons.collections4.MapUtils;
import org.apache.dolphinscheduler.spi.enums.CommandType;
import org.apache.dolphinscheduler.spi.enums.DataType;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
@@ -60,12 +62,14 @@ public class ParamUtils {
CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
Date scheduleTime = taskExecutionContext.getScheduleTime();
+ Map<String, Property> convertedParams = new HashMap<>();
+
// combining local and global parameters
Map<String, Property> localParams = parameters.getLocalParametersMap();
Map<String, Property> varParams = parameters.getVarPoolMap();
- if (globalParams == null && localParams == null) {
+ if (MapUtils.isEmpty(globalParams) && MapUtils.isEmpty(localParams)) {
return null;
}
// if it is a complement,
@@ -75,8 +79,7 @@ public class ParamUtils {
.getBusinessTime(commandType,
scheduleTime);
- if (globalParamsMap != null) {
-
+ if (MapUtils.isNotEmpty(globalParamsMap)) {
params.putAll(globalParamsMap);
}
@@ -87,12 +90,19 @@ public class ParamUtils {
if (globalParams != null && localParams != null) {
globalParams.putAll(localParams);
+ for (Map.Entry<String, Property> entry : localParams.entrySet()) {
+ convertedParams.put(entry.getKey(), entry.getValue());
+ }
} else if (globalParams == null && localParams != null) {
globalParams = localParams;
+ convertedParams = localParams;
}
if (varParams != null) {
varParams.putAll(globalParams);
globalParams = varParams;
+ for (Map.Entry<String, Property> entry : varParams.entrySet()) {
+ convertedParams.put(entry.getKey(), entry.getValue());
+ }
}
Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
while (iter.hasNext()) {
@@ -111,9 +121,20 @@ public class ParamUtils {
val = ParameterUtils.convertParameterPlaceholders(val, params);
property.setValue(val);
}
+
+ if (property.getProp().startsWith(START_UP_PARAMS_PREFIX)) {
+ property.setProp(property.getProp().replaceFirst(START_UP_PARAMS_PREFIX, ""));
+ convertedParams.put(property.getProp(), property);
+ } else if (property.getProp().startsWith(GLOBAL_PARAMS_PREFIX)) {
+ String prop = property.getProp().replaceFirst(GLOBAL_PARAMS_PREFIX, "");
+ if (!convertedParams.containsKey(prop)) {
+ property.setProp(prop);
+ convertedParams.put(prop, property);
+ }
+ }
}
- return globalParams;
+ return convertedParams;
}
/**
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java
index 90ee18311a..89ca56faf9 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/paramparser/PlaceholderUtils.java
@@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.spi.task.paramparser;
+import static org.apache.dolphinscheduler.spi.utils.Constants.GLOBAL_PARAMS_PREFIX;
+import static org.apache.dolphinscheduler.spi.utils.Constants.START_UP_PARAMS_PREFIX;
+
import java.util.Map;
import org.slf4j.Logger;
@@ -92,7 +95,9 @@ public class PlaceholderUtils {
@Override
public String resolvePlaceholder(String placeholderName) {
try {
- return paramsMap.get(placeholderName);
+ String startUpPlaceholderName = START_UP_PARAMS_PREFIX + placeholderName;
+ String globalPlaceholderName = GLOBAL_PARAMS_PREFIX + placeholderName;
+ return paramsMap.getOrDefault(startUpPlaceholderName, paramsMap.getOrDefault(placeholderName, paramsMap.getOrDefault(globalPlaceholderName, null)));
} catch (Exception ex) {
logger.error("resolve placeholder '{}' in [ {} ]", placeholderName, value, ex);
return null;
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java
index abe0672241..33e34b37f8 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/Constants.java
@@ -136,6 +136,11 @@ public class Constants {
public static final String COM_DB2_JDBC_DRIVER = "com.ibm.db2.jcc.DB2Driver";
public static final String COM_PRESTO_JDBC_DRIVER = "com.facebook.presto.jdbc.PrestoDriver";
+ /**
+ * temporary parameter prefix
+ */
+ public static final String START_UP_PARAMS_PREFIX = "startup-";
+ public static final String GLOBAL_PARAMS_PREFIX = "global-";
/**
* validation Query
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index 5d9fb80ce1..43859e8da1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -158,7 +158,7 @@ public class ShellTask extends AbstractTaskExecutor {
private String parseScript(String script) {
// combining local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+ Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}