You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/08/25 14:19:37 UTC
[dolphinscheduler] branch dev updated: [FIX-5908][MasterServer]
When executing an compensation task,
the execution thread would have a NPE (#5909)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 2fa3b41 [FIX-5908][MasterServer] When executing an compensation task, the execution thread would have a NPE (#5909)
2fa3b41 is described below
commit 2fa3b419a0598c499ae0e9cb39f2402f43718418
Author: kyoty <ec...@gmail.com>
AuthorDate: Wed Aug 25 22:19:28 2021 +0800
[FIX-5908][MasterServer] When executing an compensation task, the execution thread would have a NPE (#5909)
* fix the npe in MasterExec
* fix the compile error
---
.../server/master/runner/MasterExecThread.java | 17 ++++++++++-------
1 file changed, 10 insertions(+), 7 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 856b833..18d78c1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -46,6 +46,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule;
@@ -525,9 +526,9 @@ public class MasterExecThread implements Runnable {
return taskInstance;
}
- public void getPreVarPool(TaskInstance taskInstance, Set<String> preTask) {
- Map<String,Property> allProperty = new HashMap<>();
- Map<String,TaskInstance> allTaskInstance = new HashMap<>();
+ public void getPreVarPool(TaskInstance taskInstance, Set<String> preTask) {
+ Map<String, Property> allProperty = new HashMap<>();
+ Map<String, TaskInstance> allTaskInstance = new HashMap<>();
if (CollectionUtils.isNotEmpty(preTask)) {
for (String preTaskName : preTask) {
TaskInstance preTaskInstance = completeTaskList.get(preTaskName);
@@ -565,17 +566,17 @@ public class MasterExecThread implements Runnable {
TaskInstance otherTask = allTaskInstance.get(proName);
if (otherTask.getEndTime().getTime() > preTaskInstance.getEndTime().getTime()) {
allProperty.put(proName, thisProperty);
- allTaskInstance.put(proName,preTaskInstance);
+ allTaskInstance.put(proName, preTaskInstance);
} else {
allProperty.put(proName, otherPro);
}
} else {
allProperty.put(proName, thisProperty);
- allTaskInstance.put(proName,preTaskInstance);
+ allTaskInstance.put(proName, preTaskInstance);
}
} else {
allProperty.put(proName, thisProperty);
- allTaskInstance.put(proName,preTaskInstance);
+ allTaskInstance.put(proName, preTaskInstance);
}
}
@@ -947,7 +948,7 @@ public class MasterExecThread implements Runnable {
if (!sendTimeWarning && checkProcessTimeOut(processInstance)) {
processAlertManager.sendProcessTimeoutAlert(processInstance,
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion()));
+ processInstance.getProcessDefinitionVersion()));
sendTimeWarning = true;
}
for (Map.Entry<MasterBaseTaskExecThread, Future<Boolean>> entry : activeTaskNode.entrySet()) {
@@ -976,7 +977,9 @@ public class MasterExecThread implements Runnable {
task.getName(), task.getId(), task.getState());
// node success , post node submit
if (task.getState() == ExecutionStatus.SUCCESS) {
+ ProcessDefinition relatedProcessDefinition = processInstance.getProcessDefinition();
processInstance = processService.findProcessInstanceById(processInstance.getId());
+ processInstance.setProcessDefinition(relatedProcessDefinition);
processInstance.setVarPool(task.getVarPool());
processService.updateProcessInstance(processInstance);
completeTaskList.put(task.getName(), task);