You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ch...@apache.org on 2022/11/07 09:02:26 UTC

[dolphinscheduler] branch 3.0.2-prepare updated: [Improvement][Master] Construct processInstance may NPE when master handling command (#12056) (#12776)

This is an automated email from the ASF dual-hosted git repository.

chufenggao pushed a commit to branch 3.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.0.2-prepare by this push:
     new 1105a05fae [Improvement][Master] Construct processInstance may NPE when master handling command (#12056) (#12776)
1105a05fae is described below

commit 1105a05fae76a965b1e7e305df0eaf0aad466c11
Author: Eric Gao <er...@gmail.com>
AuthorDate: Mon Nov 7 17:02:20 2022 +0800

    [Improvement][Master] Construct processInstance may NPE when master handling command (#12056) (#12776)
    
    Co-authored-by: xuhaihui <xu...@cmss.chinamobile.com>
    Co-authored-by: xuhhui <sz...@163.com>
    Co-authored-by: xuhaihui <xu...@cmss.chinamobile.com>
---
 .../service/process/ProcessServiceImpl.java        | 43 +++++++++++++---------
 1 file changed, 25 insertions(+), 18 deletions(-)

diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 81a70939ac..c552740021 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -909,6 +909,9 @@ public class ProcessServiceImpl implements ProcessService {
             throw new IllegalArgumentException("Cannot find the process definition for this workflowInstance");
         }
         Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
+        if(cmdParam == null){
+            cmdParam = new HashMap<>();
+        }
         int processInstanceId = command.getProcessInstanceId();
         if (processInstanceId == 0) {
             processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
@@ -918,35 +921,37 @@ public class ProcessServiceImpl implements ProcessService {
                 return null;
             }
         }
-        if (cmdParam != null) {
-            CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command);
-            // reset global params while repeat running is needed by cmdParam
-            if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
-                setGlobalParamIfCommanded(processDefinition, cmdParam);
-            }
 
-            // time zone
-            String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
+        CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command);
+        // reset global params while repeat running is needed by cmdParam
+        if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
+            setGlobalParamIfCommanded(processDefinition, cmdParam);
+        }
 
-            // Recalculate global parameters after rerun.
-            processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
+        // time zone
+        String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
+
+        // Recalculate global parameters after rerun.
+        String globalParams = ParameterUtils.curingGlobalParams(
                 processDefinition.getGlobalParamMap(),
                 processDefinition.getGlobalParamList(),
                 commandTypeIfComplement,
-                processInstance.getScheduleTime(), timezoneId));
-            processInstance.setProcessDefinition(processDefinition);
-        }
-        //reset command parameter
+                processInstance.getScheduleTime(), timezoneId);
+        processInstance.setGlobalParams(globalParams);
+        processInstance.setProcessDefinition(processDefinition);
+
+        // reset command parameter
         if (processInstance.getCommandParam() != null) {
             Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam());
+            Map<String, String> finalCmdParam = cmdParam;
             processCmdParam.forEach((key, value) -> {
-                if (!cmdParam.containsKey(key)) {
-                    cmdParam.put(key, value);
+                if (!finalCmdParam.containsKey(key)) {
+                    finalCmdParam.put(key, value);
                 }
             });
         }
         // reset command parameter if sub process
-        if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
+        if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
             processInstance.setCommandParam(command.getCommandParam());
         }
         if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
@@ -995,7 +1000,9 @@ public class ProcessServiceImpl implements ProcessService {
                     // initialize the pause state
                     initTaskInstance(this.findTaskInstanceById(taskId));
                 }
-                cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING, String.join(",", convertIntListToString(suspendedNodeList)));
+
+                cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
+                    String.join(Constants.COMMA, convertIntListToString(stopNodeList)));
                 processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 processInstance.setRunTimes(runTime + 1);
                 break;