You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/09/17 02:07:09 UTC

[incubator-dolphinscheduler] branch 1.3.3-release updated: [Fix-3616][Server] when worker akc/response master exception , async retry (#3748)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch 1.3.3-release
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/1.3.3-release by this push:
     new 29f1123  [Fix-3616][Server] when worker akc/response master exception , async retry (#3748)
29f1123 is described below

commit 29f1123d1d95ddaae7b531d704fb2cc372d0cda2
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Thu Sep 17 10:06:59 2020 +0800

    [Fix-3616][Server] when worker akc/response master exception , async retry (#3748)
---
 .github/workflows/ci_e2e.yml                       |  10 +-
 .../dolphinscheduler/common/enums/Event.java       |  23 +++++
 .../remote/command/CommandType.java                | 111 ++++++++++++++++++++-
 .../remote/command/DBTaskAckCommand.java           |  72 +++++++++++++
 .../remote/command/DBTaskResponseCommand.java      |  71 +++++++++++++
 .../server/master/processor/TaskAckProcessor.java  |  25 +----
 .../master/processor/TaskResponseProcessor.java    |  26 +----
 .../master/processor/queue/TaskResponseEvent.java  |  33 +++++-
 .../processor/queue/TaskResponseService.java       |  55 +++++++---
 .../server/worker/WorkerServer.java                |  11 ++
 .../server/worker/cache/ResponceCache.java         |  94 +++++++++++++++++
 .../worker/processor/DBTaskAckProcessor.java       |  56 +++++++++++
 .../worker/processor/DBTaskResponseProcessor.java  |  58 +++++++++++
 .../worker/processor/NettyRemoteChannel.java       |   8 ++
 .../worker/processor/TaskCallbackService.java      |  31 +++---
 .../worker/processor/TaskExecuteProcessor.java     |  10 +-
 .../worker/runner/RetryReportTaskStatusThread.java |  92 +++++++++++++++++
 .../server/worker/runner/TaskExecuteThread.java    |  13 ++-
 .../processor/queue/TaskResponseServiceTest.java   |   4 +-
 19 files changed, 705 insertions(+), 98 deletions(-)

diff --git a/.github/workflows/ci_e2e.yml b/.github/workflows/ci_e2e.yml
index 82c81ef..ebd5364 100644
--- a/.github/workflows/ci_e2e.yml
+++ b/.github/workflows/ci_e2e.yml
@@ -58,7 +58,9 @@ jobs:
           wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
           sudo dpkg -i google-chrome*.deb
           sudo apt-get install -f -y
-          wget -N https://chromedriver.storage.googleapis.com/83.0.4103.39/chromedriver_linux64.zip
+          google-chrome -version
+          googleVersion=$(curl -s https://chromedriver.storage.googleapis.com/LATEST_RELEASE)
+          wget -N https://chromedriver.storage.googleapis.com/${googleVersion}/chromedriver_linux64.zip
           unzip chromedriver_linux64.zip
           sudo mv -f chromedriver /usr/local/share/chromedriver
           sudo ln -s /usr/local/share/chromedriver /usr/local/bin/chromedriver
@@ -66,9 +68,7 @@ jobs:
         run: cd ./e2e && mvn -B clean test
       - name: Collect logs
         if: failure()
-        uses: actions/upload-artifact@v1
+        uses: actions/upload-artifact@v2
         with:
           name: dslogs
-          path: /var/lib/docker/volumes/docker-swarm_dolphinscheduler-logs/_data
-
-
+          path: ${{ github.workspace }}/docker/docker-swarm/dolphinscheduler-logs
\ No newline at end of file
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
new file mode 100644
index 0000000..9cec276
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.enums;
+
+public enum Event {
+    ACK,
+    RESULT;
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index d1ffc65..17c1e44 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1,110 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;


public enum CommandType {

    /**
     * remove task log request,
     */
    REMOVE_TAK_LOG_REQUEST,

    /**
     * remove task log respons
 e
     */
    REMOVE_TAK_LOG_RESPONSE,

    /**
     *  roll view log request
     */
    ROLL_VIEW_LOG_REQUEST,

    /**
     *  roll view log response
     */
    ROLL_VIEW_LOG_RESPONSE,

    /**
     * view whole log request
     */
    VIEW_WHOLE_LOG_REQUEST,

    /**
     * view whole log response
     */
    VIEW_WHOLE_LOG_RESPONSE,

    /**
     * get log bytes request
     */
    GET_LOG_BYTES_REQUEST,

    /**
     * get log bytes response
     */
    GET_LOG_BYTES_RESPONSE,


    WORKER_REQUEST,
    MASTER_RESPONSE,

    /**
     * execute task request
     */
    TASK_EXECUTE_REQUEST,

    /**
     * execute task ack
     */
    TASK_EXECUTE_ACK,

    /**
     * execute task response
     */
    TASK_EXECUTE_RESPONSE,

    /**
     * kill task
     */
    TASK_KILL_REQUEST,

    /**
     * kill task response
     */
    TASK_KILL_RESPONSE,

    /**
     *  ping
     */
    PING,

    /**
     *  pong
     */
    PONG;
}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+public enum CommandType {
+
+    /**
+     * remove task log request,
+     */
+    REMOVE_TAK_LOG_REQUEST,
+
+    /**
+     * remove task log response
+     */
+    REMOVE_TAK_LOG_RESPONSE,
+
+    /**
+     *  roll view log request
+     */
+    ROLL_VIEW_LOG_REQUEST,
+
+    /**
+     *  roll view log response
+     */
+    ROLL_VIEW_LOG_RESPONSE,
+
+    /**
+     * view whole log request
+     */
+    VIEW_WHOLE_LOG_REQUEST,
+
+    /**
+     * view whole log response
+     */
+    VIEW_WHOLE_LOG_RESPONSE,
+
+    /**
+     * get log bytes request
+     */
+    GET_LOG_BYTES_REQUEST,
+
+    /**
+     * get log bytes response
+     */
+    GET_LOG_BYTES_RESPONSE,
+
+
+    WORKER_REQUEST,
+    MASTER_RESPONSE,
+
+    /**
+     * execute task request
+     */
+    TASK_EXECUTE_REQUEST,
+
+    /**
+     * execute task ack
+     */
+    TASK_EXECUTE_ACK,
+
+    /**
+     * execute task response
+     */
+    TASK_EXECUTE_RESPONSE,
+
+    /**
+     * db task ack
+     */
+    DB_TASK_ACK,
+
+    /**
+     * db task response
+     */
+    DB_TASK_RESPONSE,
+
+    /**
+     * kill task
+     */
+    TASK_KILL_REQUEST,
+
+    /**
+     * kill task response
+     */
+    TASK_KILL_RESPONSE,
+
+    /**
+     *  ping
+     */
+    PING,
+
+    /**
+     *  pong
+     */
+    PONG;
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java
new file mode 100644
index 0000000..f37eb97
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ *  db task ack request command
+ */
+public class DBTaskAckCommand implements Serializable {
+
+    private int taskInstanceId;
+    private int status;
+
+    public DBTaskAckCommand(int status,int taskInstanceId) {
+        this.status = status;
+        this.taskInstanceId = taskInstanceId;
+    }
+
+    public int getTaskInstanceId() {
+        return taskInstanceId;
+    }
+
+    public void setTaskInstanceId(int taskInstanceId) {
+        this.taskInstanceId = taskInstanceId;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public void setStatus(int status) {
+        this.status = status;
+    }
+
+    /**
+     * package response command
+     * @return command
+     */
+    public Command convert2Command(){
+        Command command = new Command();
+        command.setType(CommandType.DB_TASK_ACK);
+        byte[] body = FastJsonSerializer.serialize(this);
+        command.setBody(body);
+        return command;
+    }
+
+
+    @Override
+    public String toString() {
+        return "DBTaskAckCommand{" +
+                "taskInstanceId=" + taskInstanceId +
+                ", status=" + status +
+                '}';
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java
new file mode 100644
index 0000000..a640298
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ *  db task final result response command
+ */
+public class DBTaskResponseCommand implements Serializable {
+
+    private int taskInstanceId;
+    private int status;
+
+    public DBTaskResponseCommand(int status,int taskInstanceId) {
+        this.status = status;
+        this.taskInstanceId = taskInstanceId;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public void setStatus(int status) {
+        this.status = status;
+    }
+
+    public int getTaskInstanceId() {
+        return taskInstanceId;
+    }
+
+    public void setTaskInstanceId(int taskInstanceId) {
+        this.taskInstanceId = taskInstanceId;
+    }
+
+    /**
+     * package response command
+     * @return command
+     */
+    public Command convert2Command(){
+        Command command = new Command();
+        command.setType(CommandType.DB_TASK_RESPONSE);
+        byte[] body = FastJsonSerializer.serialize(this);
+        command.setBody(body);
+        return command;
+    }
+
+    @Override
+    public String toString() {
+        return "DBTaskResponseCommand{" +
+                "taskInstanceId=" + taskInstanceId +
+                ", status=" + status +
+                '}';
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index 7bc3759..9ca9645 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -19,10 +19,7 @@ package org.apache.dolphinscheduler.server.master.processor;
 
 import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@@ -34,11 +31,9 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.dolphinscheduler.common.Constants.*;
 
 /**
  *  task ack processor
@@ -57,16 +52,9 @@ public class TaskAckProcessor implements NettyRequestProcessor {
      */
     private final TaskInstanceCacheManager taskInstanceCacheManager;
 
-
-    /**
-     * processService
-     */
-    private ProcessService processService;
-
     public TaskAckProcessor(){
         this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
         this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
-        this.processService = SpringApplicationContext.getBean(ProcessService.class);
     }
 
     /**
@@ -92,19 +80,10 @@ public class TaskAckProcessor implements NettyRequestProcessor {
                 workerAddress,
                 taskAckCommand.getExecutePath(),
                 taskAckCommand.getLogPath(),
-                taskAckCommand.getTaskInstanceId());
+                taskAckCommand.getTaskInstanceId(),
+                channel);
 
         taskResponseService.addResponse(taskResponseEvent);
-
-        while (Stopper.isRunning()){
-            TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId());
-
-            if (taskInstance != null && ackStatus.typeIsRunning()){
-                break;
-            }
-            ThreadUtils.sleep(SLEEP_TIME_MILLIS);
-        }
-
     }
 
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 721b146..5982f6c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -19,10 +19,7 @@ package org.apache.dolphinscheduler.server.master.processor;
 
 import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -33,11 +30,9 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
 import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.dolphinscheduler.common.Constants.*;
 
 /**
  *  task response processor
@@ -56,15 +51,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
      */
     private final TaskInstanceCacheManager taskInstanceCacheManager;
 
-    /**
-     * processService
-     */
-    private ProcessService processService;
-
     public TaskResponseProcessor(){
         this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
         this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
-        this.processService = SpringApplicationContext.getBean(ProcessService.class);
     }
 
     /**
@@ -83,25 +72,14 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
 
         taskInstanceCacheManager.cacheTaskInstance(responseCommand);
 
-        ExecutionStatus responseStatus = ExecutionStatus.of(responseCommand.getStatus());
-
         // TaskResponseEvent
         TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
                 responseCommand.getEndTime(),
                 responseCommand.getProcessId(),
                 responseCommand.getAppIds(),
-                responseCommand.getTaskInstanceId());
-
+                responseCommand.getTaskInstanceId(),
+                channel);
         taskResponseService.addResponse(taskResponseEvent);
-
-        while (Stopper.isRunning()){
-            TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
-
-            if (taskInstance != null && responseStatus.typeIsFinished()){
-                break;
-            }
-            ThreadUtils.sleep(SLEEP_TIME_MILLIS);
-        }
     }
 
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
index 9e8813f..494d95f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 
 import java.util.Date;
@@ -76,7 +78,18 @@ public class TaskResponseEvent {
      */
     private Event event;
 
-    public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId){
+    /**
+     * channel
+     */
+    private Channel channel;
+
+    public static TaskResponseEvent newAck(ExecutionStatus state,
+                                           Date startTime,
+                                           String workerAddress,
+                                           String executePath,
+                                           String logPath,
+                                           int taskInstanceId,
+                                           Channel channel){
         TaskResponseEvent event = new TaskResponseEvent();
         event.setState(state);
         event.setStartTime(startTime);
@@ -85,10 +98,16 @@ public class TaskResponseEvent {
         event.setLogPath(logPath);
         event.setTaskInstanceId(taskInstanceId);
         event.setEvent(Event.ACK);
+        event.setChannel(channel);
         return event;
     }
 
-    public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId){
+    public static TaskResponseEvent newResult(ExecutionStatus state,
+                                              Date endTime,
+                                              int processId,
+                                              String appIds,
+                                              int taskInstanceId,
+                                              Channel channel){
         TaskResponseEvent event = new TaskResponseEvent();
         event.setState(state);
         event.setEndTime(endTime);
@@ -96,6 +115,7 @@ public class TaskResponseEvent {
         event.setAppIds(appIds);
         event.setTaskInstanceId(taskInstanceId);
         event.setEvent(Event.RESULT);
+        event.setChannel(channel);
         return event;
     }
 
@@ -179,8 +199,11 @@ public class TaskResponseEvent {
         this.event = event;
     }
 
-    public enum Event{
-        ACK,
-        RESULT;
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public void setChannel(Channel channel) {
+        this.channel = channel;
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index b9772ca..f365db7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -17,7 +17,13 @@
 
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,23 +127,48 @@ public class TaskResponseService {
      * @param taskResponseEvent taskResponseEvent
      */
     private void persist(TaskResponseEvent taskResponseEvent){
-        TaskResponseEvent.Event event = taskResponseEvent.getEvent();
+        Event event = taskResponseEvent.getEvent();
+        Channel channel = taskResponseEvent.getChannel();
 
         switch (event){
             case ACK:
-                processService.changeTaskState(taskResponseEvent.getState(),
-                        taskResponseEvent.getStartTime(),
-                        taskResponseEvent.getWorkerAddress(),
-                        taskResponseEvent.getExecutePath(),
-                        taskResponseEvent.getLogPath(),
-                        taskResponseEvent.getTaskInstanceId());
+                try {
+                    TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
+                    if (taskInstance != null){
+                        processService.changeTaskState(taskResponseEvent.getState(),
+                                taskResponseEvent.getStartTime(),
+                                taskResponseEvent.getWorkerAddress(),
+                                taskResponseEvent.getExecutePath(),
+                                taskResponseEvent.getLogPath(),
+                                taskResponseEvent.getTaskInstanceId());
+                    }
+                    // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
+                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId());
+                    channel.writeAndFlush(taskAckCommand.convert2Command());
+                }catch (Exception e){
+                    logger.error("worker ack master error",e);
+                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(),-1);
+                    channel.writeAndFlush(taskAckCommand.convert2Command());
+                }
                 break;
             case RESULT:
-                processService.changeTaskState(taskResponseEvent.getState(),
-                        taskResponseEvent.getEndTime(),
-                        taskResponseEvent.getProcessId(),
-                        taskResponseEvent.getAppIds(),
-                        taskResponseEvent.getTaskInstanceId());
+                try {
+                    TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
+                    if (taskInstance != null){
+                        processService.changeTaskState(taskResponseEvent.getState(),
+                                taskResponseEvent.getEndTime(),
+                                taskResponseEvent.getProcessId(),
+                                taskResponseEvent.getAppIds(),
+                                taskResponseEvent.getTaskInstanceId());
+                    }
+                    // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
+                    DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(),taskResponseEvent.getTaskInstanceId());
+                    channel.writeAndFlush(taskResponseCommand.convert2Command());
+                }catch (Exception e){
+                    logger.error("worker response master error",e);
+                    DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(),-1);
+                    channel.writeAndFlush(taskResponseCommand.convert2Command());
+                }
                 break;
             default:
                 throw new IllegalArgumentException("invalid event type : " + event);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index f0833cb..6895de3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -22,9 +22,12 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
+import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,6 +73,9 @@ public class WorkerServer {
     @Autowired
     private SpringApplicationContext springApplicationContext;
 
+    @Autowired
+    private RetryReportTaskStatusThread retryReportTaskStatusThread;
+
     /**
      * worker server startup
      *
@@ -95,11 +101,16 @@ public class WorkerServer {
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
+        this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
+        this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
         this.nettyRemotingServer.start();
 
         // worker registry
         this.workerRegistry.registry();
 
+        // retry report task status
+        this.retryReportTaskStatusThread.start();
+
         /**
          * register hooks, which are called before the process exits
          */
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java
new file mode 100644
index 0000000..3639b8e
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.cache;
+
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.remote.command.Command;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Responce Cache : cache worker send master result
+ */
+public class ResponceCache {
+
+    private static final ResponceCache instance = new ResponceCache();
+
+    private ResponceCache(){}
+
+    public static ResponceCache get(){
+        return instance;
+    }
+
+    private Map<Integer,Command> ackCache = new ConcurrentHashMap<>();
+    private Map<Integer,Command> responseCache = new ConcurrentHashMap<>();
+
+
+    /**
+     * cache response
+     * @param taskInstanceId taskInstanceId
+     * @param command command
+     * @param event event ACK/RESULT
+     */
+    public void cache(Integer taskInstanceId, Command command, Event event){
+        switch (event){
+            case ACK:
+                ackCache.put(taskInstanceId,command);
+                break;
+            case RESULT:
+                responseCache.put(taskInstanceId,command);
+                break;
+            default:
+                throw new IllegalArgumentException("invalid event type : " + event);
+        }
+    }
+
+
+    /**
+     * remove ack cache
+     * @param taskInstanceId taskInstanceId
+     */
+    public void removeAckCache(Integer taskInstanceId){
+        ackCache.remove(taskInstanceId);
+    }
+
+    /**
+     * remove reponse cache
+     * @param taskInstanceId taskInstanceId
+     */
+    public void removeResponseCache(Integer taskInstanceId){
+        responseCache.remove(taskInstanceId);
+    }
+
+    /**
+     * getAckCache
+     * @return getAckCache
+     */
+    public Map<Integer,Command> getAckCache(){
+        return ackCache;
+    }
+
+    /**
+     * getResponseCache
+     * @return getResponseCache
+     */
+    public Map<Integer,Command> getResponseCache(){
+        return responseCache;
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
new file mode 100644
index 0000000..ff0fd8e
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.remote.command.*;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  db task ack processor
+ */
+public class DBTaskAckProcessor implements NettyRequestProcessor {
+
+    private final Logger logger = LoggerFactory.getLogger(DBTaskAckProcessor.class);
+
+
+    @Override
+    public void process(Channel channel, Command command) {
+        Preconditions.checkArgument(CommandType.DB_TASK_ACK == command.getType(),
+                String.format("invalid command type : %s", command.getType()));
+
+        DBTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
+                command.getBody(), DBTaskAckCommand.class);
+
+        if (taskAckCommand == null){
+            return;
+        }
+
+        if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
+            ResponceCache.get().removeAckCache(taskAckCommand.getTaskInstanceId());
+        }
+    }
+
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
new file mode 100644
index 0000000..126defd
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  db task response processor
+ */
+public class DBTaskResponseProcessor implements NettyRequestProcessor {
+
+    private final Logger logger = LoggerFactory.getLogger(DBTaskResponseProcessor.class);
+
+
+    @Override
+    public void process(Channel channel, Command command) {
+        Preconditions.checkArgument(CommandType.DB_TASK_RESPONSE == command.getType(),
+                String.format("invalid command type : %s", command.getType()));
+
+        DBTaskResponseCommand taskResponseCommand = FastJsonSerializer.deserialize(
+                command.getBody(), DBTaskResponseCommand.class);
+
+        if (taskResponseCommand == null){
+            return;
+        }
+
+        if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
+            ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
+        }
+    }
+
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
index cbb8972..9762b10 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
@@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import org.apache.dolphinscheduler.remote.utils.Host;
 
+import java.util.Random;
+
 /**
  *  callback channel
  */
@@ -50,6 +52,12 @@ public class NettyRemoteChannel {
         this.opaque = opaque;
     }
 
+    public NettyRemoteChannel(Channel channel) {
+        this.channel = channel;
+        this.host = ChannelUtils.toAddress(channel);
+        this.opaque = -1;
+    }
+
     public Channel getChannel() {
         return channel;
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 6a23a9e..ca7d3c6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -86,20 +86,19 @@ public class TaskCallbackService {
      * @return callback channel
      */
     private NettyRemoteChannel getRemoteChannel(int taskInstanceId){
+        Channel newChannel;
         NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
-        if(nettyRemoteChannel == null){
-            throw new IllegalArgumentException("nettyRemoteChannel is empty, should call addRemoteChannel first");
-        }
-        if(nettyRemoteChannel.isActive()){
-            return nettyRemoteChannel;
-        }
-        Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
-        if(newChannel != null){
-            return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
+        if(nettyRemoteChannel != null){
+            if(nettyRemoteChannel.isActive()){
+                return nettyRemoteChannel;
+            }
+            newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
+            if(newChannel != null){
+                return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
+            }
+
         }
-        logger.warn("original master : {} for task : {} is not reachable, random select master",
-                nettyRemoteChannel.getHost(),
-                taskInstanceId);
+
         Set<String> masterNodes = null;
         int ntries = 0;
         while (Stopper.isRunning()) {
@@ -119,7 +118,7 @@ public class TaskCallbackService {
             for (String masterNode : masterNodes) {
                 newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));
                 if (newChannel != null) {
-                    return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
+                    return getRemoteChannel(newChannel,taskInstanceId);
                 }
             }
             masterNodes = null;
@@ -141,6 +140,12 @@ public class TaskCallbackService {
         return remoteChannel;
     }
 
+    private NettyRemoteChannel getRemoteChannel(Channel newChannel, int taskInstanceId){
+        NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel);
+        addRemoteChannel(taskInstanceId, remoteChannel);
+        return remoteChannel;
+    }
+
     /**
      *  remove callback channels
      * @param taskInstanceId taskInstanceId
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 4ca110f..5ecc2c7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -22,6 +22,7 @@ import ch.qos.logback.classic.sift.SiftingAppender;
 import com.alibaba.fastjson.JSONObject;
 import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
@@ -36,6 +37,7 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -101,12 +103,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
                 new NettyRemoteChannel(channel, command.getOpaque()));
 
-        try {
-            this.doAck(taskExecutionContext);
-        }catch (Exception e){
-            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
-            this.doAck(taskExecutionContext);
-        }
+        this.doAck(taskExecutionContext);
 
         // submit task
         workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService));
@@ -115,6 +112,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
     private void doAck(TaskExecutionContext taskExecutionContext){
         // tell master that task is in executing
         TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
+        ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),ackCommand.convert2Command(),Event.ACK);
         taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
new file mode 100644
index 0000000..ea9bb03
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.dolphinscheduler.common.thread.Stopper;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * Retry Report Task Status Thread
+ */
+@Component
+public class RetryReportTaskStatusThread implements Runnable {
+
+    private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
+
+    /**
+     * every 5 minutes
+     */
+    private static long RETRY_REPORT_TASK_STATUS_TIME = 5 * 60 * 1000L;
+    /**
+     *  task callback service
+     */
+    private final TaskCallbackService taskCallbackService;
+
+    public void start(){
+        Thread thread = new Thread(this,"RetryReportTaskStatusThread");
+        thread.start();
+    }
+
+    public RetryReportTaskStatusThread(){
+        this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
+    }
+
+    /**
+     * retry ack/response
+     */
+    @Override
+    public void run() {
+        ResponceCache responceCache = ResponceCache.get();
+
+        while (Stopper.isRunning()){
+            try {
+                if (!responceCache.getAckCache().isEmpty()){
+                    Map<Integer,Command> ackCache =  responceCache.getAckCache();
+                    for (Map.Entry<Integer, Command> entry : ackCache.entrySet()){
+                        Integer taskInstanceId = entry.getKey();
+                        Command ackCommand = entry.getValue();
+                        taskCallbackService.sendAck(taskInstanceId,ackCommand);
+                    }
+                }
+
+                if (!responceCache.getResponseCache().isEmpty()){
+                    Map<Integer,Command> responseCache =  responceCache.getResponseCache();
+                    for (Map.Entry<Integer, Command> entry : responseCache.entrySet()){
+                        Integer taskInstanceId = entry.getKey();
+                        Command responseCommand = entry.getValue();
+                        taskCallbackService.sendAck(taskInstanceId,responseCommand);
+                    }
+                }
+            }catch (Exception e){
+                logger.warn("retry report task status error", e);
+            }
+
+            ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_TIME);
+        }
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 592060b..e224896 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
 import com.alibaba.fastjson.JSONObject;
 import org.apache.commons.collections.MapUtils;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.process.Property;
@@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
 import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
@@ -144,13 +146,10 @@ public class TaskExecuteThread implements Runnable {
             responseCommand.setProcessId(task.getProcessId());
             responseCommand.setAppIds(task.getAppIds());
         } finally {
-            try {
-                taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-                taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
-            }catch (Exception e){
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
-                taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
-            }
+            taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+            ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT);
+            taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
+
         }
     }
 
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index dcba832..a22663e 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -44,7 +44,7 @@ public class TaskResponseServiceTest {
     @Test
     public void testAdd(){
         TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(),
-                "", "", "", 1);
+                "", "", "", 1,null);
         taskResponseService.addResponse(taskResponseEvent);
         Assert.assertTrue(taskResponseService.getEventQueue().size() == 1);
         try {
@@ -58,7 +58,7 @@ public class TaskResponseServiceTest {
     @Test
     public void testStop(){
         TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(),
-                "", "", "", 1);
+                "", "", "", 1,null);
         taskResponseService.addResponse(taskResponseEvent);
         taskResponseService.stop();
         Assert.assertTrue(taskResponseService.getEventQueue().size() == 0);