You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by lg...@apache.org on 2020/12/10 02:57:21 UTC

[incubator-dolphinscheduler] branch dev updated: [FIX-PR-4097][server-master]task ack miss (#4189)

This is an automated email from the ASF dual-hosted git repository.

lgcareer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new aa19a06  [FIX-PR-4097][server-master]task ack miss (#4189)
aa19a06 is described below

commit aa19a06abbc13b402217b75e8364bb5010671b88
Author: Kirs <ac...@163.com>
AuthorDate: Thu Dec 10 10:57:12 2020 +0800

    [FIX-PR-4097][server-master]task ack miss (#4189)
    
    When the message of successful execution arrives earlier than
    the message of ack,
    the message of ack will be discarded,
    resulting in some information missing
---
 .../server/master/processor/queue/TaskResponseService.java           | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index 51ecf45..1b5eddb 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -138,8 +138,9 @@ public class TaskResponseService {
             case ACK:
                 try {
                     TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
-                    if (taskInstance != null && !taskInstance.getState().typeIsFinished()) {
-                        processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
+                    if (taskInstance != null) {
+                        ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
+                        processService.changeTaskState(taskInstance, status,
                             taskResponseEvent.getStartTime(),
                             taskResponseEvent.getWorkerAddress(),
                             taskResponseEvent.getExecutePath(),