You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2020/12/10 08:48:19 UTC

[incubator-dolphinscheduler] branch 1.3.4-prepare updated: cherry-pick task_ack_miss (#4198)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 1.3.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/1.3.4-prepare by this push:
     new 656ccab  cherry-pick task_ack_miss (#4198)
656ccab is described below

commit 656ccab9302cca7ddc9a37b1c5b4adcec6a1d838
Author: Kirs <ac...@163.com>
AuthorDate: Thu Dec 10 16:48:11 2020 +0800

    cherry-pick task_ack_miss (#4198)
---
 .../master/processor/queue/TaskResponseService.java     | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index 4cd7e8b..d09ed71 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -134,16 +134,17 @@ public class TaskResponseService {
             case ACK:
                 try {
                     TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
-                    if (taskInstance != null && !taskInstance.getState().typeIsFinished()) {
-                        processService.changeTaskState(taskResponseEvent.getState(),
-                                taskResponseEvent.getStartTime(),
-                                taskResponseEvent.getWorkerAddress(),
-                                taskResponseEvent.getExecutePath(),
-                                taskResponseEvent.getLogPath(),
-                                taskResponseEvent.getTaskInstanceId());
+                    if (taskInstance != null) {
+                        ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
+                        processService.changeTaskState(status,
+                            taskResponseEvent.getStartTime(),
+                            taskResponseEvent.getWorkerAddress(),
+                            taskResponseEvent.getExecutePath(),
+                            taskResponseEvent.getLogPath(),
+                            taskResponseEvent.getTaskInstanceId());
                     }
                     // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
-                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId());
+                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
                     channel.writeAndFlush(taskAckCommand.convert2Command());
                 }catch (Exception e){
                     logger.error("worker ack master error",e);