You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2020/12/10 08:48:19 UTC
[incubator-dolphinscheduler] branch 1.3.4-prepare updated:
cherry-pick task_ack_miss (#4198)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 1.3.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.4-prepare by this push:
new 656ccab cherry-pick task_ack_miss (#4198)
656ccab is described below
commit 656ccab9302cca7ddc9a37b1c5b4adcec6a1d838
Author: Kirs <ac...@163.com>
AuthorDate: Thu Dec 10 16:48:11 2020 +0800
cherry-pick task_ack_miss (#4198)
---
.../master/processor/queue/TaskResponseService.java | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index 4cd7e8b..d09ed71 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -134,16 +134,17 @@ public class TaskResponseService {
case ACK:
try {
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
- if (taskInstance != null && !taskInstance.getState().typeIsFinished()) {
- processService.changeTaskState(taskResponseEvent.getState(),
- taskResponseEvent.getStartTime(),
- taskResponseEvent.getWorkerAddress(),
- taskResponseEvent.getExecutePath(),
- taskResponseEvent.getLogPath(),
- taskResponseEvent.getTaskInstanceId());
+ if (taskInstance != null) {
+ ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
+ processService.changeTaskState(status,
+ taskResponseEvent.getStartTime(),
+ taskResponseEvent.getWorkerAddress(),
+ taskResponseEvent.getExecutePath(),
+ taskResponseEvent.getLogPath(),
+ taskResponseEvent.getTaskInstanceId());
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
- DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId());
+ DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskAckCommand.convert2Command());
}catch (Exception e){
logger.error("worker ack master error",e);