You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/02/17 07:10:29 UTC

[dolphinscheduler] branch 2.0.4-prepare updated: [Fix-8367][Master] execute COMPLEMENT_DATA command always missing some ack event (#8407)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 2.0.4-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.4-prepare by this push:
     new 9bfff90  [Fix-8367][Master] execute COMPLEMENT_DATA command always missing some ack event (#8407)
9bfff90 is described below

commit 9bfff90c66d7e7c2ef52d5f291159cc95d5b540b
Author: xiangzihao <46...@qq.com>
AuthorDate: Thu Feb 17 15:08:26 2022 +0800

    [Fix-8367][Master] execute COMPLEMENT_DATA command always missing some ack event (#8407)
    
    * fix bug_8367
    
    * fix bug_8367
    
    * fix bug_8367
    
    * fix bug_8367
---
 .../master/runner/WorkflowExecuteThread.java       | 52 ++++++++++++++--------
 1 file changed, 34 insertions(+), 18 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index f08cb82..731063b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.Environment;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -480,28 +481,43 @@ public class WorkflowExecuteThread implements Runnable {
                     processInstance.getScheduleTime(),
                     complementListDate.toString());
             scheduleDate = complementListDate.get(index + 1);
-            //the next process complement
-            processInstance.setId(0);
+
+        }
+
+        //the next process complement
+        int create = this.createComplementDataCommand(scheduleDate);
+        if (create > 0) {
+            logger.info("create complement data command successfully. process id: {}", processInstance.getId());
         }
-        processInstance.setScheduleTime(scheduleDate);
+
+        return true;
+    }
+
+    private int createComplementDataCommand(Date scheduleDate) {
+        Command command = new Command();
+        command.setScheduleTime(scheduleDate);
+        command.setCommandType(CommandType.COMPLEMENT_DATA);
+        command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
         Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
         if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
             cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
-            processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
-        }
-
-        processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
-        processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
-                processDefinition.getGlobalParamMap(),
-                processDefinition.getGlobalParamList(),
-                CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
-        processInstance.setStartTime(new Date());
-        processInstance.setRestartTime(processInstance.getStartTime());
-        processInstance.setEndTime(null);
-        processService.saveProcessInstance(processInstance);
-        this.taskInstanceHashMap.clear();
-        startProcess();
-        return true;
+        }
+        cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, "yyyy-MM-dd HH:mm:ss"));
+        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+        command.setTaskDependType(processInstance.getTaskDependType());
+        command.setFailureStrategy(processInstance.getFailureStrategy());
+        command.setWarningType(processInstance.getWarningType());
+        command.setWarningGroupId(processInstance.getWarningGroupId());
+        command.setStartTime(new Date());
+        command.setExecutorId(processInstance.getExecutorId());
+        command.setUpdateTime(new Date());
+        command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
+        command.setWorkerGroup(processInstance.getWorkerGroup());
+        command.setEnvironmentCode(processInstance.getEnvironmentCode());
+        command.setDryRun(processInstance.getDryRun());
+        command.setProcessInstanceId(0);
+        command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
+        return processService.createCommand(command);
     }
 
     private boolean needComplementProcess() {