You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/01/18 03:40:46 UTC

[dolphinscheduler] branch 2.0.3-prepare updated: [2.0.3][Bug] [dolphinscheduler-remote] process is always running: netty writeAndFlush without retry when failed leads to worker response to master failed (#8081)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 2.0.3-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.3-prepare by this push:
     new d2bd95d  [2.0.3][Bug] [dolphinscheduler-remote] process is always running: netty writeAndFlush without retry when failed leads to worker response to master failed (#8081)
d2bd95d is described below

commit d2bd95d453eae507ec8d0c41f7d91ff0ba6f7468
Author: zwZjut <zw...@163.com>
AuthorDate: Tue Jan 18 11:40:40 2022 +0800

    [2.0.3][Bug] [dolphinscheduler-remote] process is always running: netty writeAndFlush without retry when failed leads to worker response to master failed (#8081)
    
    * to #8046
    
    * to #8046
    
    * to #8046
    
    * to #8046
    
    * to #8046
    
    Co-authored-by: honghuo.zw <ho...@alibaba-inc.com>
---
 .../remote/command/CommandType.java                |  5 ++
 .../remote/command/TaskKillAckCommand.java         | 71 ++++++++++++++++++++++
 .../processor/queue/TaskResponsePersistThread.java |  3 +
 .../server/worker/WorkerServer.java                |  2 +
 .../server/worker/cache/ResponceCache.java         | 17 ++++++
 .../worker/processor/TaskKillAckProcessor.java     | 60 ++++++++++++++++++
 .../server/worker/processor/TaskKillProcessor.java | 24 ++++----
 .../worker/runner/RetryReportTaskStatusThread.java |  8 +++
 8 files changed, 178 insertions(+), 12 deletions(-)

diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index cf2cfe5..b204267 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -99,6 +99,11 @@ public enum CommandType {
     TASK_KILL_RESPONSE,
 
     /**
+     * kill task response ack
+     */
+    TASK_KILL_RESPONSE_ACK,
+
+    /**
      * HEART_BEAT
      */
     HEART_BEAT,
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillAckCommand.java
new file mode 100644
index 0000000..61775d5
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillAckCommand.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.io.Serializable;
+
+public class TaskKillAckCommand implements Serializable {
+
+    private int taskInstanceId;
+    private int status;
+
+    public TaskKillAckCommand() {
+        super();
+    }
+
+    public TaskKillAckCommand(int status, int taskInstanceId) {
+        this.status = status;
+        this.taskInstanceId = taskInstanceId;
+    }
+
+    public int getTaskInstanceId() {
+        return taskInstanceId;
+    }
+
+    public void setTaskInstanceId(int taskInstanceId) {
+        this.taskInstanceId = taskInstanceId;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public void setStatus(int status) {
+        this.status = status;
+    }
+
+    /**
+     * package response command
+     *
+     * @return command
+     */
+    public Command convert2Command() {
+        Command command = new Command();
+        command.setType(CommandType.TASK_KILL_RESPONSE_ACK);
+        byte[] body = JSONUtils.toJsonByteArray(this);
+        command.setBody(body);
+        return command;
+    }
+
+    @Override
+    public String toString() {
+        return "KillTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
index ca8ad0a..0e9a0b9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
 import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillAckCommand;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
 import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
@@ -157,6 +158,8 @@ public class TaskResponsePersistThread implements Runnable {
                         logger.debug("ACTION_STOP: task instance id:{}, process instance id:{}", taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId());
                     }
                 }
+                TaskKillAckCommand taskKillAckCommand = new TaskKillAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
+                channel.writeAndFlush(taskKillAckCommand.convert2Command());
                 break;
             default:
                 throw new IllegalArgumentException("invalid event type : " + event);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index c66718f..ff0067c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskKillAckProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
 import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
@@ -140,6 +141,7 @@ public class WorkerServer implements IStoppable {
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService, taskPluginManager));
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE_ACK,new TaskKillAckProcessor());
         this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
         this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
         this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, new HostUpdateProcessor());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java
index 3639b8e..9ee91f2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java
@@ -38,6 +38,7 @@ public class ResponceCache {
 
     private Map<Integer,Command> ackCache = new ConcurrentHashMap<>();
     private Map<Integer,Command> responseCache = new ConcurrentHashMap<>();
+    private final Map<Integer,Command> killResponseCache = new ConcurrentHashMap<>();
 
 
     /**
@@ -54,6 +55,9 @@ public class ResponceCache {
             case RESULT:
                 responseCache.put(taskInstanceId,command);
                 break;
+            case ACTION_STOP:
+                killResponseCache.put(taskInstanceId,command);
+                break;
             default:
                 throw new IllegalArgumentException("invalid event type : " + event);
         }
@@ -69,6 +73,19 @@ public class ResponceCache {
     }
 
     /**
+     * remove kill response cache
+     *
+     * @param taskInstanceId taskInstanceId
+     */
+    public void removeKillResponseCache(Integer taskInstanceId) {
+        killResponseCache.remove(taskInstanceId);
+    }
+
+    public Map<Integer, Command> getKillResponseCache() {
+        return killResponseCache;
+    }
+
+    /**
      * remove reponse cache
      * @param taskInstanceId taskInstanceId
      */
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillAckProcessor.java
new file mode 100644
index 0000000..bbb2c8f
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillAckProcessor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskKillAckCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+
+public class TaskKillAckProcessor implements NettyRequestProcessor {
+
+    private final Logger logger = LoggerFactory.getLogger(TaskKillAckProcessor.class);
+
+    @Override
+    public void process(Channel channel, Command command) {
+        Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE_ACK == command.getType(),
+                String.format("invalid command type : %s", command.getType()));
+
+        TaskKillAckCommand taskKillAckCommand = JSONUtils.parseObject(
+                command.getBody(), TaskKillAckCommand.class);
+
+        if (taskKillAckCommand == null) {
+            return;
+        }
+
+        if (taskKillAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
+            ResponceCache.get().removeKillResponseCache(taskKillAckCommand.getTaskInstanceId());
+            TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillAckCommand.getTaskInstanceId());
+            logger.debug("removeKillResponseCache: taskinstance id:{}", taskKillAckCommand.getTaskInstanceId());
+            TaskCallbackService.remove(taskKillAckCommand.getTaskInstanceId());
+            logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskKillAckCommand.getTaskInstanceId());
+        }
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 4f235ea..2cf70ae 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.server.worker.processor;
 
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -31,6 +32,7 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.remote.utils.Pair;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
@@ -93,16 +95,19 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
         logger.info("received kill command : {}", killCommand);
 
-        Pair<Boolean, List<String>> result = doKill(killCommand);
-
         taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
                 new NettyRemoteChannel(channel, command.getOpaque()));
 
-        TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result);
+        Pair<Boolean, List<String>> result = doKill(killCommand);
+
+        TaskRequest taskRequest = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
+
+        if (taskRequest == null) {
+            return;
+        }
+        TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskRequest, result);
+        ResponceCache.get().cache(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command(), Event.ACTION_STOP);
         taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
-        TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
-        TaskCallbackService.remove(killCommand.getTaskInstanceId());
-        logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
     }
 
     /**
@@ -116,7 +121,6 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         int taskInstanceId = killCommand.getTaskInstanceId();
         TaskRequest taskRequest = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
         TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(JSONUtils.toJsonString(taskRequest), TaskExecutionContext.class);
-
         try {
             Integer processId = taskExecutionContext.getProcessId();
             if (processId.equals(0)) {
@@ -161,15 +165,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
      * @param result exe result
      * @return build TaskKillResponseCommand
      */
-    private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand,
+    private TaskKillResponseCommand buildKillTaskResponseCommand(TaskRequest taskRequest,
                                                                  Pair<Boolean, List<String>> result) {
         TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
         taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
         taskKillResponseCommand.setAppIds(result.getRight());
-        TaskRequest taskRequest = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
-        if (taskRequest == null) {
-            return taskKillResponseCommand;
-        }
         TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(JSONUtils.toJsonString(taskRequest), TaskExecutionContext.class);
         if (taskExecutionContext != null) {
             taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index f52be9d..d99c84d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -88,6 +88,14 @@ public class RetryReportTaskStatusThread implements Runnable {
                         taskCallbackService.sendResult(taskInstanceId,responseCommand);
                     }
                 }
+                if (!responceCache.getKillResponseCache().isEmpty()) {
+                    Map<Integer, Command> killResponseCache = responceCache.getKillResponseCache();
+                    for (Map.Entry<Integer, Command> entry : killResponseCache.entrySet()) {
+                        Integer taskInstanceId = entry.getKey();
+                        Command killResponseCommand = entry.getValue();
+                        taskCallbackService.sendResult(taskInstanceId, killResponseCommand);
+                    }
+                }
             }catch (Exception e){
                 logger.warn("retry report task status error", e);
             }