You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/08/25 14:19:37 UTC

[dolphinscheduler] branch dev updated: [FIX-5908][MasterServer] When executing an compensation task, the execution thread would have a NPE (#5909)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2fa3b41  [FIX-5908][MasterServer] When executing an compensation task, the execution thread would have a NPE (#5909)
2fa3b41 is described below

commit 2fa3b419a0598c499ae0e9cb39f2402f43718418
Author: kyoty <ec...@gmail.com>
AuthorDate: Wed Aug 25 22:19:28 2021 +0800

    [FIX-5908][MasterServer] When executing an compensation task, the execution thread would have a NPE (#5909)
    
    * fix the npe in MasterExec
    
    * fix the compile error
---
 .../server/master/runner/MasterExecThread.java          | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 856b833..18d78c1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -46,6 +46,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.ProjectUser;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
@@ -525,9 +526,9 @@ public class MasterExecThread implements Runnable {
         return taskInstance;
     }
 
-    public void getPreVarPool(TaskInstance taskInstance,  Set<String> preTask) {
-        Map<String,Property> allProperty = new HashMap<>();
-        Map<String,TaskInstance> allTaskInstance = new HashMap<>();
+    public void getPreVarPool(TaskInstance taskInstance, Set<String> preTask) {
+        Map<String, Property> allProperty = new HashMap<>();
+        Map<String, TaskInstance> allTaskInstance = new HashMap<>();
         if (CollectionUtils.isNotEmpty(preTask)) {
             for (String preTaskName : preTask) {
                 TaskInstance preTaskInstance = completeTaskList.get(preTaskName);
@@ -565,17 +566,17 @@ public class MasterExecThread implements Runnable {
                 TaskInstance otherTask = allTaskInstance.get(proName);
                 if (otherTask.getEndTime().getTime() > preTaskInstance.getEndTime().getTime()) {
                     allProperty.put(proName, thisProperty);
-                    allTaskInstance.put(proName,preTaskInstance);
+                    allTaskInstance.put(proName, preTaskInstance);
                 } else {
                     allProperty.put(proName, otherPro);
                 }
             } else {
                 allProperty.put(proName, thisProperty);
-                allTaskInstance.put(proName,preTaskInstance);
+                allTaskInstance.put(proName, preTaskInstance);
             }
         } else {
             allProperty.put(proName, thisProperty);
-            allTaskInstance.put(proName,preTaskInstance);
+            allTaskInstance.put(proName, preTaskInstance);
         }
     }
 
@@ -947,7 +948,7 @@ public class MasterExecThread implements Runnable {
             if (!sendTimeWarning && checkProcessTimeOut(processInstance)) {
                 processAlertManager.sendProcessTimeoutAlert(processInstance,
                         processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
-                        processInstance.getProcessDefinitionVersion()));
+                                processInstance.getProcessDefinitionVersion()));
                 sendTimeWarning = true;
             }
             for (Map.Entry<MasterBaseTaskExecThread, Future<Boolean>> entry : activeTaskNode.entrySet()) {
@@ -976,7 +977,9 @@ public class MasterExecThread implements Runnable {
                         task.getName(), task.getId(), task.getState());
                 // node success , post node submit
                 if (task.getState() == ExecutionStatus.SUCCESS) {
+                    ProcessDefinition relatedProcessDefinition = processInstance.getProcessDefinition();
                     processInstance = processService.findProcessInstanceById(processInstance.getId());
+                    processInstance.setProcessDefinition(relatedProcessDefinition);
                     processInstance.setVarPool(task.getVarPool());
                     processService.updateProcessInstance(processInstance);
                     completeTaskList.put(task.getName(), task);