You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/02/17 07:10:29 UTC
[dolphinscheduler] branch 2.0.4-prepare updated: [Fix-8367][Master] execute COMPLEMENT_DATA command always missing some ack event (#8407)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 2.0.4-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.4-prepare by this push:
new 9bfff90 [Fix-8367][Master] execute COMPLEMENT_DATA command always missing some ack event (#8407)
9bfff90 is described below
commit 9bfff90c66d7e7c2ef52d5f291159cc95d5b540b
Author: xiangzihao <46...@qq.com>
AuthorDate: Thu Feb 17 15:08:26 2022 +0800
[Fix-8367][Master] execute COMPLEMENT_DATA command always missing some ack event (#8407)
* fix bug_8367
* fix bug_8367
* fix bug_8367
* fix bug_8367
---
.../master/runner/WorkflowExecuteThread.java | 52 ++++++++++++++--------
1 file changed, 34 insertions(+), 18 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index f08cb82..731063b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -480,28 +481,43 @@ public class WorkflowExecuteThread implements Runnable {
processInstance.getScheduleTime(),
complementListDate.toString());
scheduleDate = complementListDate.get(index + 1);
- //the next process complement
- processInstance.setId(0);
+
+ }
+
+ //the next process complement
+ int create = this.createComplementDataCommand(scheduleDate);
+ if (create > 0) {
+ logger.info("create complement data command successfully. process id: {}", processInstance.getId());
}
- processInstance.setScheduleTime(scheduleDate);
+
+ return true;
+ }
+
+ private int createComplementDataCommand(Date scheduleDate) {
+ Command command = new Command();
+ command.setScheduleTime(scheduleDate);
+ command.setCommandType(CommandType.COMPLEMENT_DATA);
+ command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
- processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
- }
-
- processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
- processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
- processInstance.setStartTime(new Date());
- processInstance.setRestartTime(processInstance.getStartTime());
- processInstance.setEndTime(null);
- processService.saveProcessInstance(processInstance);
- this.taskInstanceHashMap.clear();
- startProcess();
- return true;
+ }
+ cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, "yyyy-MM-dd HH:mm:ss"));
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ command.setTaskDependType(processInstance.getTaskDependType());
+ command.setFailureStrategy(processInstance.getFailureStrategy());
+ command.setWarningType(processInstance.getWarningType());
+ command.setWarningGroupId(processInstance.getWarningGroupId());
+ command.setStartTime(new Date());
+ command.setExecutorId(processInstance.getExecutorId());
+ command.setUpdateTime(new Date());
+ command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
+ command.setWorkerGroup(processInstance.getWorkerGroup());
+ command.setEnvironmentCode(processInstance.getEnvironmentCode());
+ command.setDryRun(processInstance.getDryRun());
+ command.setProcessInstanceId(0);
+ command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
+ return processService.createCommand(command);
}
private boolean needComplementProcess() {