You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/01/18 03:40:46 UTC
[dolphinscheduler] branch 2.0.3-prepare updated: [2.0.3][Bug] [dolphinscheduler-remote] process is always running: netty writeAndFlush without retry when failed leads to worker response to master failed (#8081)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 2.0.3-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.3-prepare by this push:
new d2bd95d [2.0.3][Bug] [dolphinscheduler-remote] process is always running: netty writeAndFlush without retry when failed leads to worker response to master failed (#8081)
d2bd95d is described below
commit d2bd95d453eae507ec8d0c41f7d91ff0ba6f7468
Author: zwZjut <zw...@163.com>
AuthorDate: Tue Jan 18 11:40:40 2022 +0800
[2.0.3][Bug] [dolphinscheduler-remote] process is always running: netty writeAndFlush without retry when failed leads to worker response to master failed (#8081)
* to #8046
* to #8046
* to #8046
* to #8046
* to #8046
Co-authored-by: honghuo.zw <ho...@alibaba-inc.com>
---
.../remote/command/CommandType.java | 5 ++
.../remote/command/TaskKillAckCommand.java | 71 ++++++++++++++++++++++
.../processor/queue/TaskResponsePersistThread.java | 3 +
.../server/worker/WorkerServer.java | 2 +
.../server/worker/cache/ResponceCache.java | 17 ++++++
.../worker/processor/TaskKillAckProcessor.java | 60 ++++++++++++++++++
.../server/worker/processor/TaskKillProcessor.java | 24 ++++----
.../worker/runner/RetryReportTaskStatusThread.java | 8 +++
8 files changed, 178 insertions(+), 12 deletions(-)
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index cf2cfe5..b204267 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -99,6 +99,11 @@ public enum CommandType {
TASK_KILL_RESPONSE,
/**
+ * kill task response ack
+ */
+ TASK_KILL_RESPONSE_ACK,
+
+ /**
* HEART_BEAT
*/
HEART_BEAT,
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillAckCommand.java
new file mode 100644
index 0000000..61775d5
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillAckCommand.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+
+import java.io.Serializable;
+
+public class TaskKillAckCommand implements Serializable {
+
+ private int taskInstanceId;
+ private int status;
+
+ public TaskKillAckCommand() {
+ super();
+ }
+
+ public TaskKillAckCommand(int status, int taskInstanceId) {
+ this.status = status;
+ this.taskInstanceId = taskInstanceId;
+ }
+
+ public int getTaskInstanceId() {
+ return taskInstanceId;
+ }
+
+ public void setTaskInstanceId(int taskInstanceId) {
+ this.taskInstanceId = taskInstanceId;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ /**
+ * package response command
+ *
+ * @return command
+ */
+ public Command convert2Command() {
+ Command command = new Command();
+ command.setType(CommandType.TASK_KILL_RESPONSE_ACK);
+ byte[] body = JSONUtils.toJsonByteArray(this);
+ command.setBody(body);
+ return command;
+ }
+
+ @Override
+ public String toString() {
+ return "KillTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
index ca8ad0a..0e9a0b9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillAckCommand;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
@@ -157,6 +158,8 @@ public class TaskResponsePersistThread implements Runnable {
logger.debug("ACTION_STOP: task instance id:{}, process instance id:{}", taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId());
}
}
+ TaskKillAckCommand taskKillAckCommand = new TaskKillAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
+ channel.writeAndFlush(taskKillAckCommand.convert2Command());
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index c66718f..ff0067c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskKillAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
@@ -140,6 +141,7 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService, taskPluginManager));
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE_ACK,new TaskKillAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, new HostUpdateProcessor());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java
index 3639b8e..9ee91f2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java
@@ -38,6 +38,7 @@ public class ResponceCache {
private Map<Integer,Command> ackCache = new ConcurrentHashMap<>();
private Map<Integer,Command> responseCache = new ConcurrentHashMap<>();
+ private final Map<Integer,Command> killResponseCache = new ConcurrentHashMap<>();
/**
@@ -54,6 +55,9 @@ public class ResponceCache {
case RESULT:
responseCache.put(taskInstanceId,command);
break;
+ case ACTION_STOP:
+ killResponseCache.put(taskInstanceId,command);
+ break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
@@ -69,6 +73,19 @@ public class ResponceCache {
}
/**
+ * remove kill response cache
+ *
+ * @param taskInstanceId taskInstanceId
+ */
+ public void removeKillResponseCache(Integer taskInstanceId) {
+ killResponseCache.remove(taskInstanceId);
+ }
+
+ public Map<Integer, Command> getKillResponseCache() {
+ return killResponseCache;
+ }
+
+ /**
* remove reponse cache
* @param taskInstanceId taskInstanceId
*/
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillAckProcessor.java
new file mode 100644
index 0000000..bbb2c8f
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillAckProcessor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskKillAckCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+
+public class TaskKillAckProcessor implements NettyRequestProcessor {
+
+ private final Logger logger = LoggerFactory.getLogger(TaskKillAckProcessor.class);
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE_ACK == command.getType(),
+ String.format("invalid command type : %s", command.getType()));
+
+ TaskKillAckCommand taskKillAckCommand = JSONUtils.parseObject(
+ command.getBody(), TaskKillAckCommand.class);
+
+ if (taskKillAckCommand == null) {
+ return;
+ }
+
+ if (taskKillAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
+ ResponceCache.get().removeKillResponseCache(taskKillAckCommand.getTaskInstanceId());
+ TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillAckCommand.getTaskInstanceId());
+ logger.debug("removeKillResponseCache: taskinstance id:{}", taskKillAckCommand.getTaskInstanceId());
+ TaskCallbackService.remove(taskKillAckCommand.getTaskInstanceId());
+ logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskKillAckCommand.getTaskInstanceId());
+ }
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 4f235ea..2cf70ae 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -31,6 +32,7 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
@@ -93,16 +95,19 @@ public class TaskKillProcessor implements NettyRequestProcessor {
TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
logger.info("received kill command : {}", killCommand);
- Pair<Boolean, List<String>> result = doKill(killCommand);
-
taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
- TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result);
+ Pair<Boolean, List<String>> result = doKill(killCommand);
+
+ TaskRequest taskRequest = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
+
+ if (taskRequest == null) {
+ return;
+ }
+ TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskRequest, result);
+ ResponceCache.get().cache(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command(), Event.ACTION_STOP);
taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
- TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
- TaskCallbackService.remove(killCommand.getTaskInstanceId());
- logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
}
/**
@@ -116,7 +121,6 @@ public class TaskKillProcessor implements NettyRequestProcessor {
int taskInstanceId = killCommand.getTaskInstanceId();
TaskRequest taskRequest = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(JSONUtils.toJsonString(taskRequest), TaskExecutionContext.class);
-
try {
Integer processId = taskExecutionContext.getProcessId();
if (processId.equals(0)) {
@@ -161,15 +165,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @param result exe result
* @return build TaskKillResponseCommand
*/
- private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand,
+ private TaskKillResponseCommand buildKillTaskResponseCommand(TaskRequest taskRequest,
Pair<Boolean, List<String>> result) {
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
taskKillResponseCommand.setAppIds(result.getRight());
- TaskRequest taskRequest = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
- if (taskRequest == null) {
- return taskKillResponseCommand;
- }
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(JSONUtils.toJsonString(taskRequest), TaskExecutionContext.class);
if (taskExecutionContext != null) {
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index f52be9d..d99c84d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -88,6 +88,14 @@ public class RetryReportTaskStatusThread implements Runnable {
taskCallbackService.sendResult(taskInstanceId,responseCommand);
}
}
+ if (!responceCache.getKillResponseCache().isEmpty()) {
+ Map<Integer, Command> killResponseCache = responceCache.getKillResponseCache();
+ for (Map.Entry<Integer, Command> entry : killResponseCache.entrySet()) {
+ Integer taskInstanceId = entry.getKey();
+ Command killResponseCommand = entry.getValue();
+ taskCallbackService.sendResult(taskInstanceId, killResponseCommand);
+ }
+ }
}catch (Exception e){
logger.warn("retry report task status error", e);
}