You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/13 14:32:35 UTC

[dolphinscheduler] branch dev updated: Fix kill yarn job error when failover caused by doesn't set ProcessDefinition (#10948)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b245e7c973 Fix kill yarn job error when failover caused by doesn't set ProcessDefinition (#10948)
b245e7c973 is described below

commit b245e7c973a5282a1405097605a4afacac1b5eab
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Jul 13 22:32:30 2022 +0800

    Fix kill yarn job error when failover caused by doesn't set ProcessDefinition (#10948)
---
 .../master/service/MasterFailoverService.java       | 21 +++++++++++++++------
 .../master/service/WorkerFailoverService.java       |  9 +++++----
 2 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index c7e5b4ea13..61ba7c3fd6 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -114,16 +115,18 @@ public class MasterFailoverService {
      * @param masterHost master host
      */
     private void doFailoverMaster(@NonNull String masterHost) {
-        LOGGER.info("Master[{}] failover starting, need to failover process", masterHost);
         StopWatch failoverTimeCost = StopWatch.createStarted();
 
-        Optional<Date> masterStartupTimeOptional =
-            getServerStartupTime(registryClient.getServerList(NodeType.MASTER), masterHost);
-        List<ProcessInstance> needFailoverProcessInstanceList =
-            processService.queryNeedFailoverProcessInstances(masterHost);
+        Optional<Date> masterStartupTimeOptional = getServerStartupTime(registryClient.getServerList(NodeType.MASTER),
+                                                                        masterHost);
+        List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(
+            masterHost);
+        if (CollectionUtils.isEmpty(needFailoverProcessInstanceList)) {
+            return;
+        }
 
         LOGGER.info(
-            "Master[{}] failover there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}",
+            "Master[{}] failover starting there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}",
             masterHost,
             needFailoverProcessInstanceList.size(),
             needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
@@ -136,6 +139,11 @@ public class MasterFailoverService {
                     LOGGER.info("WorkflowInstance doesn't need to failover");
                     continue;
                 }
+                // todo: use batch query
+                ProcessDefinition processDefinition
+                    = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
+                                                           processInstance.getProcessDefinitionVersion());
+                processInstance.setProcessDefinition(processDefinition);
                 int processInstanceId = processInstance.getId();
                 List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId);
                 for (TaskInstance taskInstance : taskInstanceList) {
@@ -205,6 +213,7 @@ public class MasterFailoverService {
             TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
                 .buildTaskInstanceRelatedInfo(taskInstance)
                 .buildProcessInstanceRelatedInfo(processInstance)
+                .buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
                 .create();
 
             if (masterConfig.isKillYarnJobWhenTaskFailover()) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
index ec126a3ec3..402ec43354 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -115,10 +115,10 @@ public class WorkerFailoverService {
         for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
             LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
             try {
-                ProcessInstance processInstance =
-                    processInstanceCacheMap.computeIfAbsent(taskInstance.getProcessInstanceId(), k -> {
-                        WorkflowExecuteRunnable workflowExecuteRunnable =
-                            cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId());
+                ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent(
+                    taskInstance.getProcessInstanceId(), k -> {
+                        WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId(
+                            taskInstance.getProcessInstanceId());
                         if (workflowExecuteRunnable == null) {
                             return null;
                         }
@@ -167,6 +167,7 @@ public class WorkerFailoverService {
             TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
                 .buildTaskInstanceRelatedInfo(taskInstance)
                 .buildProcessInstanceRelatedInfo(processInstance)
+                .buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
                 .create();
 
             if (masterConfig.isKillYarnJobWhenTaskFailover()) {