You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/12 06:08:48 UTC
[dolphinscheduler] branch dev updated: [Fix-10827] Fix network error cause worker cannot send message to master (#10886)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new cade66a9b6 [Fix-10827] Fix network error cause worker cannot send message to master (#10886)
cade66a9b6 is described below
commit cade66a9b6b14450c6f778f7e05820b588856a41
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Jul 12 14:08:42 2022 +0800
[Fix-10827] Fix network error cause worker cannot send message to master (#10886)
* Fix network error cause worker cannot send message to master
---
.../server/master/config/MasterConfig.java | 6 +
.../master/consumer/TaskPriorityQueueConsumer.java | 8 +-
.../dispatch/executor/NettyExecutorManager.java | 4 +-
.../server/master/event/TaskDelayEventHandler.java | 8 +-
.../event/TaskRejectByWorkerEventHandler.java | 19 +-
.../master/event/TaskResultEventHandler.java | 20 +-
.../master/event/TaskRunningEventHandler.java | 8 +-
.../processor/TaskExecuteResponseProcessor.java | 17 +-
.../processor/TaskExecuteRunningProcessor.java | 6 +-
.../master/processor/TaskRecallProcessor.java | 6 +-
.../server/master/processor/queue/TaskEvent.java | 8 +-
.../server/master/rpc/MasterRPCServer.java | 4 +-
.../master/runner/StateWheelExecuteThread.java | 2 -
.../master/dispatch/ExecutionContextTestUtils.java | 16 +-
.../master/dispatch/ExecutorDispatcherTest.java | 6 +-
.../executor/NettyExecutorManagerTest.java | 25 +-
.../master/processor/TaskAckProcessorTest.java | 20 +-
.../processor/queue/TaskResponseServiceTest.java | 47 ++--
.../remote/command/BaseCommand.java | 57 +++++
.../remote/command/CommandHeader.java | 47 +---
.../remote/command/CommandType.java | 39 +--
.../remote/command/HostUpdateCommand.java | 3 -
.../remote/command/StateEventResponseCommand.java | 2 +-
...equestCommand.java => TaskDispatchCommand.java} | 24 +-
...lAckCommand.java => TaskExecuteAckCommand.java} | 48 ++--
.../remote/command/TaskExecuteResponseCommand.java | 212 ----------------
...kCommand.java => TaskExecuteResultCommand.java} | 97 +++++---
...mand.java => TaskExecuteRunningAckMessage.java} | 6 +-
.../remote/command/TaskExecuteRunningCommand.java | 101 +-------
...llAckCommand.java => TaskRejectAckCommand.java} | 47 ++--
...skRecallCommand.java => TaskRejectCommand.java} | 48 +---
.../apache/dolphinscheduler/remote/utils/Host.java | 3 +-
.../server/worker/WorkerServer.java | 18 +-
.../server/worker/cache/ResponseCache.java | 115 ---------
.../server/worker/config/WorkerConfig.java | 8 +-
.../server/worker/message/MessageRetryRunner.java | 139 +++++++++++
.../server/worker/message/MessageSender.java | 49 ++--
.../message/TaskExecuteResultMessageSender.java | 70 ++++++
.../message/TaskExecuteRunningMessageSender.java | 67 +++++
.../worker/message/TaskRejectMessageSender.java | 59 +++++
.../worker/processor/HostUpdateProcessor.java | 13 +-
.../worker/processor/TaskCallbackService.java | 276 ---------------------
...teProcessor.java => TaskDispatchProcessor.java} | 78 +++---
.../processor/TaskExecuteResponseAckProcessor.java | 71 ------
.../processor/TaskExecuteResultAckProcessor.java | 83 +++++++
.../processor/TaskExecuteRunningAckProcessor.java | 17 +-
.../server/worker/processor/TaskKillProcessor.java | 50 ++--
...kProcessor.java => TaskRejectAckProcessor.java} | 40 +--
.../server/worker/rpc/WorkerMessageSender.java | 92 +++++++
.../server/worker/rpc/WorkerRpcClient.java | 75 ++++++
.../server/worker/rpc/WorkerRpcServer.java | 18 +-
.../worker/runner/RetryReportTaskStatusThread.java | 134 ----------
.../server/worker/runner/TaskExecuteThread.java | 85 ++++---
.../server/worker/runner/WorkerExecService.java | 6 +-
.../server/worker/runner/WorkerManagerThread.java | 19 +-
...sorTest.java => TaskDispatchProcessorTest.java} | 92 ++++---
.../worker/runner/TaskExecuteThreadTest.java | 12 +-
57 files changed, 1224 insertions(+), 1426 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 0494b7dd00..fe51e20984 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.config;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@@ -82,6 +83,10 @@ public class MasterConfig implements Validator {
private double reservedMemory = 0.3;
private Duration failoverInterval = Duration.ofMinutes(10);
private boolean killYarnJobWhenTaskFailover = true;
+ /**
+ * ip:listenPort
+ */
+ private String masterAddress;
@Override
public boolean supports(Class<?> clazz) {
@@ -124,5 +129,6 @@ public class MasterConfig implements Validator {
if (masterConfig.getMaxCpuLoadAvg() <= 0) {
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
+ masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 82198e942a..9c7d048ff8 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
@@ -241,7 +241,11 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
}
private Command toCommand(TaskExecutionContext taskExecutionContext) {
- TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
+ // todo: we didn't set the host here, since right now we didn't need to retry this message.
+ TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext,
+ masterConfig.getMasterAddress(),
+ taskExecutionContext.getHost(),
+ System.currentTimeMillis());
return requestCommand.convert2Command();
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index fe172f9816..e273e8f8aa 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -88,10 +88,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
@PostConstruct
public void init() {
- this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
- this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
- this.nettyRemotingClient.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
+ this.nettyRemotingClient.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor);
}
/**
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
index 8e2dcced23..25fa88dd00 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@@ -108,9 +108,9 @@ public class TaskDelayEventHandler implements TaskEventHandler {
private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker will retry this event
- TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
- new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
- taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
+ TaskExecuteRunningAckMessage taskExecuteRunningAckMessage =
+ new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+ taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckMessage.convert2Command());
}
@Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
index d09a8364ca..ac4dab50f2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
@@ -20,8 +20,9 @@ package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@@ -34,13 +35,16 @@ public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+ @Autowired
+ private MasterConfig masterConfig;
+
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
- WorkflowExecuteRunnable workflowExecuteRunnable =
- this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(
+ processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
@@ -65,9 +69,12 @@ public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
}
public void sendAckToWorker(TaskEvent taskEvent) {
- TaskRecallAckCommand taskRecallAckCommand =
- new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
- taskEvent.getChannel().writeAndFlush(taskRecallAckCommand.convert2Command());
+ TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS.getCode(),
+ taskEvent.getTaskInstanceId(),
+ masterConfig.getMasterAddress(),
+ taskEvent.getWorkerAddress(),
+ System.currentTimeMillis());
+ taskEvent.getChannel().writeAndFlush(taskRejectAckMessage.convert2Command());
}
@Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
index 67df03682f..0373baeee1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
@@ -22,8 +22,9 @@ import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
@@ -50,13 +51,16 @@ public class TaskResultEventHandler implements TaskEventHandler {
@Autowired
private ProcessService processService;
+ @Autowired
+ private MasterConfig masterConfig;
+
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, TaskEventHandleException {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
- WorkflowExecuteRunnable workflowExecuteRunnable =
- this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
+ WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(
+ processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
@@ -105,9 +109,13 @@ public class TaskResultEventHandler implements TaskEventHandler {
}
public void sendAckToWorker(TaskEvent taskEvent) {
- TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
- new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
- taskEvent.getChannel().writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
+ // we didn't set the receiver address, since the ack doen's need to retry
+ TaskExecuteAckCommand taskExecuteAckMessage = new TaskExecuteAckCommand(ExecutionStatus.SUCCESS.getCode(),
+ taskEvent.getTaskInstanceId(),
+ masterConfig.getMasterAddress(),
+ taskEvent.getWorkerAddress(),
+ System.currentTimeMillis());
+ taskEvent.getChannel().writeAndFlush(taskExecuteAckMessage.convert2Command());
}
@Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
index 31152973a2..d271839596 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
@@ -106,9 +106,9 @@ public class TaskRunningEventHandler implements TaskEventHandler {
private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker will retry this event
- TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
- new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
- taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
+ TaskExecuteRunningAckMessage taskExecuteRunningAckMessage =
+ new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+ taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckMessage.convert2Command());
}
@Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
index b17def873c..7f438e1d9c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
@@ -55,15 +55,18 @@ public class TaskExecuteResponseProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
+ Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),
+ String.format("invalid command type : %s", command.getType()));
- TaskExecuteResponseCommand taskExecuteResponseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
- TaskEvent taskResponseEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
+ TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
+ TaskExecuteResultCommand.class);
+ TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel);
try {
- LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResponseEvent.getProcessInstanceId(), taskResponseEvent.getTaskInstanceId());
- logger.info("Received task execute response, event: {}", taskResponseEvent);
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
+ taskResultEvent.getTaskInstanceId());
+ logger.info("Received task execute result, event: {}", taskResultEvent);
- taskEventService.addEvent(taskResponseEvent);
+ taskEventService.addEvent(taskResultEvent);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
index c5ddc3d77a..96ff1ca405 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
@@ -54,10 +54,10 @@ public class TaskExecuteRunningProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == command.getType(), String.format("invalid command type : %s", command.getType()));
- TaskExecuteRunningCommand taskExecuteRunningCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
- logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningCommand);
+ TaskExecuteRunningCommand taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
+ logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningMessage);
- TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
+ TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);
taskEventService.addEvent(taskEvent);
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
index e37dc6e06a..a5d404ec65 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
@@ -54,8 +54,8 @@ public class TaskRecallProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.TASK_RECALL == command.getType(), String.format("invalid command type : %s", command.getType()));
- TaskRecallCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRecallCommand.class);
+ Preconditions.checkArgument(CommandType.TASK_REJECT == command.getType(), String.format("invalid command type : %s", command.getType()));
+ TaskRejectCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRejectCommand.class);
TaskEvent taskEvent = TaskEvent.newRecallEvent(recallCommand, channel);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(recallCommand.getProcessInstanceId(), recallCommand.getTaskInstanceId());
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 248d253739..e383cad612 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -19,9 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
-import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import java.util.Date;
@@ -120,7 +120,7 @@ public class TaskEvent {
return event;
}
- public static TaskEvent newResultEvent(TaskExecuteResponseCommand command, Channel channel) {
+ public static TaskEvent newResultEvent(TaskExecuteResultCommand command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
@@ -138,7 +138,7 @@ public class TaskEvent {
return event;
}
- public static TaskEvent newRecallEvent(TaskRecallCommand command, Channel channel) {
+ public static TaskEvent newRecallEvent(TaskRejectCommand command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setTaskInstanceId(command.getTaskInstanceId());
event.setProcessInstanceId(command.getProcessInstanceId());
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
index 49d40a4b04..b6b86979de 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
@@ -84,14 +84,14 @@ public class MasterRPCServer implements AutoCloseable {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST, workflowExecutingDataRequestProcessor);
// logger server
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 843c1b246c..0d1b3a423a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -126,8 +126,6 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstanceId);
if (removeFlag) {
logger.info("Success remove workflow instance from timeout check list");
- } else {
- logger.warn("Failed to remove workflow instance from timeout check list");
}
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
index 55dd236f71..7252f747ba 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java
@@ -24,12 +24,11 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.mockito.Mockito;
@@ -44,12 +43,15 @@ public class ExecutionContextTestUtils {
processInstance.setCommandType(CommandType.COMPLEMENT_DATA);
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext context = TaskExecutionContextBuilder.get()
- .buildTaskInstanceRelatedInfo(taskInstance)
- .buildProcessInstanceRelatedInfo(processInstance)
- .buildProcessDefinitionRelatedInfo(processDefinition)
- .create();
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .buildProcessDefinitionRelatedInfo(processDefinition)
+ .create();
- TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(context);
+ TaskDispatchCommand requestCommand = new TaskDispatchCommand(context,
+ "127.0.0.1:5678",
+ "127.0.0.1:5678",
+ System.currentTimeMillis());
Command command = requestCommand.convert2Command();
ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER, taskInstance);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
index 4bfc3ca0a6..fa80e8344a 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
@@ -18,11 +18,12 @@
package org.apache.dolphinscheduler.server.master.dispatch;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.junit.Ignore;
@@ -60,7 +61,8 @@ public class ExecutorDispatcherTest {
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(port);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
- nettyRemotingServer.registerProcessor(org.apache.dolphinscheduler.remote.command.CommandType.TASK_EXECUTE_REQUEST, Mockito.mock(TaskExecuteProcessor.class));
+ nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, Mockito.mock(
+ TaskDispatchProcessor.class));
nettyRemotingServer.start();
//
workerConfig.setListenPort(port);
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
index 1ee890faf2..b69ab890f9 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
@@ -25,15 +25,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor;
import org.junit.Assert;
import org.junit.Ignore;
@@ -49,16 +48,15 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@Ignore
public class NettyExecutorManagerTest {
-
@Autowired
private NettyExecutorManager nettyExecutorManager;
-
@Test
public void testExecute() throws ExecuteException {
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
- nettyRemotingServer.registerProcessor(org.apache.dolphinscheduler.remote.command.CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
+ nettyRemotingServer.registerProcessor(org.apache.dolphinscheduler.remote.command.CommandType.TASK_DISPATCH_REQUEST,
+ new TaskDispatchProcessor());
nettyRemotingServer.start();
TaskInstance taskInstance = Mockito.mock(TaskInstance.class);
ProcessDefinition processDefinition = Mockito.mock(ProcessDefinition.class);
@@ -66,10 +64,10 @@ public class NettyExecutorManagerTest {
processInstance.setCommandType(CommandType.COMPLEMENT_DATA);
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext context = TaskExecutionContextBuilder.get()
- .buildTaskInstanceRelatedInfo(taskInstance)
- .buildProcessInstanceRelatedInfo(processInstance)
- .buildProcessDefinitionRelatedInfo(processDefinition)
- .create();
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .buildProcessDefinitionRelatedInfo(processDefinition)
+ .create();
ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
executionContext.setHost(Host.of(NetUtils.getAddr(serverConfig.getListenPort())));
Boolean execute = nettyExecutorManager.execute(executionContext);
@@ -94,10 +92,11 @@ public class NettyExecutorManagerTest {
nettyExecutorManager.execute(executionContext);
}
-
private Command toCommand(TaskExecutionContext taskExecutionContext) {
- TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
- requestCommand.setTaskExecutionContext(taskExecutionContext);
+ TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext,
+ "127.0.0.1:5678",
+ "127.0.0.1:1234",
+ System.currentTimeMillis());
return requestCommand.convert2Command();
}
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
index fc320ab493..8c15cd9cbe 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
@@ -44,7 +44,7 @@ public class TaskAckProcessorTest {
private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
private TaskEventService taskEventService;
private ProcessService processService;
- private TaskExecuteRunningCommand taskExecuteRunningCommand;
+ private TaskExecuteRunningCommand taskExecuteRunningMessage;
private TaskEvent taskResponseEvent;
private Channel channel;
@@ -63,14 +63,16 @@ public class TaskAckProcessorTest {
channel = PowerMockito.mock(Channel.class);
taskResponseEvent = PowerMockito.mock(TaskEvent.class);
- taskExecuteRunningCommand = new TaskExecuteRunningCommand();
- taskExecuteRunningCommand.setStatus(1);
- taskExecuteRunningCommand.setExecutePath("/dolphinscheduler/worker");
- taskExecuteRunningCommand.setHost("localhost");
- taskExecuteRunningCommand.setLogPath("/temp/worker.log");
- taskExecuteRunningCommand.setStartTime(new Date());
- taskExecuteRunningCommand.setTaskInstanceId(1);
- taskExecuteRunningCommand.setProcessInstanceId(1);
+ taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678",
+ " 127.0.0.1:1234",
+ System.currentTimeMillis());
+ taskExecuteRunningMessage.setStatus(1);
+ taskExecuteRunningMessage.setExecutePath("/dolphinscheduler/worker");
+ taskExecuteRunningMessage.setHost("localhost");
+ taskExecuteRunningMessage.setLogPath("/temp/worker.log");
+ taskExecuteRunningMessage.setStartTime(new Date());
+ taskExecuteRunningMessage.setTaskInstanceId(1);
+ taskExecuteRunningMessage.setProcessInstanceId(1);
}
@Test
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index c5b00db12c..3854ad77b0 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -17,9 +17,10 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
@@ -76,26 +77,30 @@ public class TaskResponseServiceTest {
Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1", 1234));
- TaskExecuteRunningCommand taskExecuteRunningCommand = new TaskExecuteRunningCommand();
- taskExecuteRunningCommand.setProcessId(1);
- taskExecuteRunningCommand.setTaskInstanceId(22);
- taskExecuteRunningCommand.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode());
- taskExecuteRunningCommand.setExecutePath("path");
- taskExecuteRunningCommand.setLogPath("logPath");
- taskExecuteRunningCommand.setHost("127.*.*.*");
- taskExecuteRunningCommand.setStartTime(new Date());
-
- ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
-
- TaskExecuteResponseCommand taskExecuteResponseCommand = new TaskExecuteResponseCommand();
- taskExecuteResponseCommand.setProcessInstanceId(1);
- taskExecuteResponseCommand.setTaskInstanceId(22);
- taskExecuteResponseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
- taskExecuteResponseCommand.setEndTime(new Date());
- taskExecuteResponseCommand.setVarPool("varPol");
- taskExecuteResponseCommand.setAppIds("ids");
- taskExecuteResponseCommand.setProcessId(1);
- resultEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
+ TaskExecuteRunningCommand taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678",
+ "127.0.0.1:1234",
+ System.currentTimeMillis());
+ taskExecuteRunningMessage.setProcessId(1);
+ taskExecuteRunningMessage.setTaskInstanceId(22);
+ taskExecuteRunningMessage.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode());
+ taskExecuteRunningMessage.setExecutePath("path");
+ taskExecuteRunningMessage.setLogPath("logPath");
+ taskExecuteRunningMessage.setHost("127.*.*.*");
+ taskExecuteRunningMessage.setStartTime(new Date());
+
+ ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);
+
+ TaskExecuteResultCommand taskExecuteResultMessage = new TaskExecuteResultCommand(NetUtils.getAddr(1234),
+ NetUtils.getAddr(5678),
+ System.currentTimeMillis());
+ taskExecuteResultMessage.setProcessInstanceId(1);
+ taskExecuteResultMessage.setTaskInstanceId(22);
+ taskExecuteResultMessage.setStatus(ExecutionStatus.SUCCESS.getCode());
+ taskExecuteResultMessage.setEndTime(new Date());
+ taskExecuteResultMessage.setVarPool("varPol");
+ taskExecuteResultMessage.setAppIds("ids");
+ taskExecuteResultMessage.setProcessId(1);
+ resultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel);
taskInstance = new TaskInstance();
taskInstance.setId(22);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseCommand.java
new file mode 100644
index 0000000000..8207b5fecc
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseCommand.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import java.io.Serializable;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * This is the base class for rpc message.
+ * <p>
+ * Since we use async mode, the client send a message and will wait the target server
+ * send ack for the message, the client will retry during a while if he doesn't receive an ack.
+ * <p>
+ * When there is a network error, the server cannot send ack to the client by the origin channel,
+ * since the client has closed the channel, so the server need to know the command source.
+ */
+@Data
+@NoArgsConstructor
+public abstract class BaseCommand implements Serializable {
+
+ private static final long serialVersionUID = -1L;
+
+ /**
+ * If the message receiver want to send ack to the sender, need to use this address.
+ */
+ protected String messageSenderAddress;
+
+ /**
+ * The message receiver address.
+ */
+ protected String messageReceiverAddress;
+
+ protected long messageSendTime;
+
+ protected BaseCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
+ this.messageSenderAddress = messageSenderAddress;
+ this.messageReceiverAddress = messageReceiverAddress;
+ this.messageSendTime = messageSendTime;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
index 9e83a426f9..32eb75dc90 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java
@@ -14,13 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.remote.command;
import java.io.Serializable;
+import lombok.Data;
+
/**
* command header
*/
+@Data
public class CommandHeader implements Serializable {
/**
@@ -34,12 +38,12 @@ public class CommandHeader implements Serializable {
private long opaque;
/**
- * context length
+ * context length
*/
private int contextLength;
/**
- * context
+ * context
*/
private byte[] context;
@@ -48,43 +52,4 @@ public class CommandHeader implements Serializable {
*/
private int bodyLength;
- public int getBodyLength() {
- return bodyLength;
- }
-
- public void setBodyLength(int bodyLength) {
- this.bodyLength = bodyLength;
- }
-
- public byte getType() {
- return type;
- }
-
- public void setType(byte type) {
- this.type = type;
- }
-
- public long getOpaque() {
- return opaque;
- }
-
- public void setOpaque(long opaque) {
- this.opaque = opaque;
- }
-
- public int getContextLength() {
- return contextLength;
- }
-
- public void setContextLength(int contextLength) {
- this.contextLength = contextLength;
- }
-
- public byte[] getContext() {
- return context;
- }
-
- public void setContext(byte[] context) {
- this.context = context;
- }
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 43a64fc559..040d82d974 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -64,9 +64,9 @@ public enum CommandType {
MASTER_RESPONSE,
/**
- * execute task request
+ * dispatch task request
*/
- TASK_EXECUTE_REQUEST,
+ TASK_DISPATCH_REQUEST,
/**
* task execute running, from worker to master
@@ -81,56 +81,29 @@ public enum CommandType {
/**
* task execute response, from worker to master
*/
- TASK_EXECUTE_RESPONSE,
+ TASK_EXECUTE_RESULT,
/**
* task execute response ack, from master to worker
*/
- TASK_EXECUTE_RESPONSE_ACK,
+ TASK_EXECUTE_RESULT_ACK,
- /**
- * kill task
- */
TASK_KILL_REQUEST,
- /**
- * kill task response
- */
TASK_KILL_RESPONSE,
- /**
- * task recall
- */
- TASK_RECALL,
+ TASK_REJECT,
- /**
- * task recall ack
- */
- TASK_RECALL_ACK,
+ TASK_REJECT_ACK,
- /**
- * HEART_BEAT
- */
HEART_BEAT,
- /**
- * ping
- */
PING,
- /**
- * pong
- */
PONG,
- /**
- * alert send request
- */
ALERT_SEND_REQUEST,
- /**
- * alert send response
- */
ALERT_SEND_RESPONSE,
/**
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
index 4fc752e4b0..d060a8e0f5 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateCommand.java
@@ -26,9 +26,6 @@ import java.io.Serializable;
*/
public class HostUpdateCommand implements Serializable {
- /**
- * task id
- */
private int taskInstanceId;
private String processHost;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java
index 41ee1ef2ee..5500c744bd 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java
@@ -61,7 +61,7 @@ public class StateEventResponseCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK);
+ command.setType(CommandType.TASK_EXECUTE_RESULT_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchCommand.java
similarity index 66%
copy from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchCommand.java
index 000e3f4e02..f04c6107c1 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchCommand.java
@@ -20,24 +20,36 @@ package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import java.io.Serializable;
-
-import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+/**
+ * The task dispatch message, means dispatch a task to worker.
+ */
@Data
@NoArgsConstructor
-@AllArgsConstructor
-public class TaskExecuteRequestCommand implements Serializable {
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskDispatchCommand extends BaseCommand {
private static final long serialVersionUID = -1L;
private TaskExecutionContext taskExecutionContext;
+ public TaskDispatchCommand(TaskExecutionContext taskExecutionContext,
+ String messageSenderAddress,
+ String messageReceiverAddress,
+ long messageSendTime) {
+ super(messageSenderAddress, messageReceiverAddress, messageSendTime);
+ this.taskExecutionContext = taskExecutionContext;
+ }
+
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.TASK_EXECUTE_REQUEST);
+ command.setType(CommandType.TASK_DISPATCH_REQUEST);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
similarity index 63%
copy from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
copy to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
index 2221a6c09d..a70cf8f239 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
@@ -19,41 +19,34 @@ package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import java.io.Serializable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
/**
- * task recall ack command
+ * task execute response ack command
+ * from master to worker
*/
-public class TaskRecallAckCommand implements Serializable {
+@Data
+@NoArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskExecuteAckCommand extends BaseCommand {
private int taskInstanceId;
private int status;
- public TaskRecallAckCommand() {
- super();
- }
-
- public TaskRecallAckCommand(int status, int taskInstanceId) {
+ public TaskExecuteAckCommand(int status,
+ int taskInstanceId,
+ String sourceServerAddress,
+ String messageReceiverAddress,
+ long messageSendTime) {
+ super(sourceServerAddress, messageReceiverAddress, messageSendTime);
this.status = status;
this.taskInstanceId = taskInstanceId;
}
- public int getTaskInstanceId() {
- return taskInstanceId;
- }
-
- public void setTaskInstanceId(int taskInstanceId) {
- this.taskInstanceId = taskInstanceId;
- }
-
- public int getStatus() {
- return status;
- }
-
- public void setStatus(int status) {
- this.status = status;
- }
-
/**
* package response command
*
@@ -61,14 +54,9 @@ public class TaskRecallAckCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.TASK_RECALL_ACK);
+ command.setType(CommandType.TASK_EXECUTE_RESULT_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
-
- @Override
- public String toString() {
- return "TaskRecallAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
- }
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
deleted file mode 100644
index 4574b8818c..0000000000
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.remote.command;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-
-import java.io.Serializable;
-import java.util.Date;
-
-/**
- * execute task response command
- */
-public class TaskExecuteResponseCommand implements Serializable {
-
- public TaskExecuteResponseCommand() {
- }
-
- public TaskExecuteResponseCommand(int taskInstanceId, int processInstanceId) {
- this.taskInstanceId = taskInstanceId;
- this.processInstanceId = processInstanceId;
- }
-
- /**
- * task instance id
- */
- private int taskInstanceId;
-
- /**
- * process instance id
- */
- private int processInstanceId;
-
- /**
- * status
- */
- private int status;
-
- /**
- * startTime
- */
- private Date startTime;
-
- /**
- * host
- */
- private String host;
-
- /**
- * logPath
- */
- private String logPath;
-
- /**
- * executePath
- */
- private String executePath;
-
-
- /**
- * end time
- */
- private Date endTime;
-
-
- /**
- * processId
- */
- private int processId;
-
- /**
- * appIds
- */
- private String appIds;
-
- /**
- * varPool string
- */
- private String varPool;
-
- public Date getStartTime() {
- return startTime;
- }
-
- public void setStartTime(Date startTime) {
- this.startTime = startTime;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public String getLogPath() {
- return logPath;
- }
-
- public void setLogPath(String logPath) {
- this.logPath = logPath;
- }
-
- public String getExecutePath() {
- return executePath;
- }
-
- public void setExecutePath(String executePath) {
- this.executePath = executePath;
- }
-
- public void setVarPool(String varPool) {
- this.varPool = varPool;
- }
-
- public String getVarPool() {
- return varPool;
- }
-
- public int getTaskInstanceId() {
- return taskInstanceId;
- }
-
- public void setTaskInstanceId(int taskInstanceId) {
- this.taskInstanceId = taskInstanceId;
- }
-
- public int getStatus() {
- return status;
- }
-
- public void setStatus(int status) {
- this.status = status;
- }
-
- public Date getEndTime() {
- return endTime;
- }
-
- public void setEndTime(Date endTime) {
- this.endTime = endTime;
- }
-
- public int getProcessId() {
- return processId;
- }
-
- public void setProcessId(int processId) {
- this.processId = processId;
- }
-
- public String getAppIds() {
- return appIds;
- }
-
- public void setAppIds(String appIds) {
- this.appIds = appIds;
- }
-
- /**
- * package response command
- *
- * @return command
- */
- public Command convert2Command() {
- Command command = new Command();
- command.setType(CommandType.TASK_EXECUTE_RESPONSE);
- byte[] body = JSONUtils.toJsonByteArray(this);
- command.setBody(body);
- return command;
- }
-
- @Override
- public String toString() {
- return "TaskExecuteResponseCommand{"
- + "taskInstanceId=" + taskInstanceId
- + ", processInstanceId=" + processInstanceId
- + ", status=" + status
- + ", startTime=" + startTime
- + ", endTime=" + endTime
- + ", host=" + host
- + ", logPath=" + logPath
- + ", executePath=" + executePath
- + ", processId=" + processId
- + ", appIds='" + appIds + '\''
- + ", varPool=" + varPool
- + '}';
- }
-
- public int getProcessInstanceId() {
- return processInstanceId;
- }
-
- public void setProcessInstanceId(int processInstanceId) {
- this.processInstanceId = processInstanceId;
- }
-}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultCommand.java
similarity index 51%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultCommand.java
index f3df257b9d..12dfa56976 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultCommand.java
@@ -19,41 +19,82 @@ package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import java.io.Serializable;
+import java.util.Date;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
/**
- * task execute response ack command
- * from master to worker
+ * execute task response command
*/
-public class TaskExecuteResponseAckCommand implements Serializable {
+@Data
+@NoArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskExecuteResultCommand extends BaseCommand {
+
+ public TaskExecuteResultCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
+ super(messageSenderAddress, messageReceiverAddress, messageSendTime);
+ }
+ /**
+ * task instance id
+ */
private int taskInstanceId;
+
+ /**
+ * process instance id
+ */
+ private int processInstanceId;
+
+ /**
+ * status
+ */
private int status;
- public TaskExecuteResponseAckCommand() {
- super();
- }
+ /**
+ * startTime
+ */
+ private Date startTime;
- public TaskExecuteResponseAckCommand(int status, int taskInstanceId) {
- this.status = status;
- this.taskInstanceId = taskInstanceId;
- }
+ /**
+ * host
+ */
+ private String host;
- public int getStatus() {
- return status;
- }
+ /**
+ * logPath
+ */
+ private String logPath;
- public void setStatus(int status) {
- this.status = status;
- }
+ /**
+ * executePath
+ */
+ private String executePath;
- public int getTaskInstanceId() {
- return taskInstanceId;
- }
- public void setTaskInstanceId(int taskInstanceId) {
- this.taskInstanceId = taskInstanceId;
- }
+ /**
+ * end time
+ */
+ private Date endTime;
+
+
+ /**
+ * processId
+ */
+ private int processId;
+
+ /**
+ * appIds
+ */
+ private String appIds;
+
+ /**
+ * varPool string
+ */
+ private String varPool;
/**
* package response command
@@ -62,17 +103,9 @@ public class TaskExecuteResponseAckCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK);
+ command.setType(CommandType.TASK_EXECUTE_RESULT);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
-
- @Override
- public String toString() {
- return "TaskExecuteResponseAckCommand{"
- + "taskInstanceId=" + taskInstanceId
- + ", status=" + status
- + '}';
- }
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckMessage.java
similarity index 92%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckMessage.java
index b0bb666cba..3e17ade0ad 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckMessage.java
@@ -25,16 +25,16 @@ import java.io.Serializable;
* task execute running ack command
* from master to worker
*/
-public class TaskExecuteRunningAckCommand implements Serializable {
+public class TaskExecuteRunningAckMessage implements Serializable {
private int taskInstanceId;
private int status;
- public TaskExecuteRunningAckCommand() {
+ public TaskExecuteRunningAckMessage() {
super();
}
- public TaskExecuteRunningAckCommand(int status, int taskInstanceId) {
+ public TaskExecuteRunningAckMessage(int status, int taskInstanceId) {
this.status = status;
this.taskInstanceId = taskInstanceId;
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java
index 0a2eac29f3..241d37744a 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java
@@ -19,14 +19,21 @@ package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import java.io.Serializable;
import java.util.Date;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
/**
- * task execute running command
- * from worker to master
+ * Task running message, means the task is running in worker.
*/
-public class TaskExecuteRunningCommand implements Serializable {
+@Data
+@NoArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskExecuteRunningCommand extends BaseCommand {
/**
* taskInstanceId
@@ -73,76 +80,8 @@ public class TaskExecuteRunningCommand implements Serializable {
*/
private String appIds;
- public Date getStartTime() {
- return startTime;
- }
-
- public void setStartTime(Date startTime) {
- this.startTime = startTime;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public int getStatus() {
- return status;
- }
-
- public void setStatus(int status) {
- this.status = status;
- }
-
- public int getTaskInstanceId() {
- return taskInstanceId;
- }
-
- public void setTaskInstanceId(int taskInstanceId) {
- this.taskInstanceId = taskInstanceId;
- }
-
- public int getProcessInstanceId() {
- return processInstanceId;
- }
-
- public void setProcessInstanceId(int processInstanceId) {
- this.processInstanceId = processInstanceId;
- }
-
- public String getLogPath() {
- return logPath;
- }
-
- public void setLogPath(String logPath) {
- this.logPath = logPath;
- }
-
- public String getExecutePath() {
- return executePath;
- }
-
- public void setExecutePath(String executePath) {
- this.executePath = executePath;
- }
-
- public int getProcessId() {
- return processId;
- }
-
- public void setProcessId(int processId) {
- this.processId = processId;
- }
-
- public String getAppIds() {
- return appIds;
- }
-
- public void setAppIds(String appIds) {
- this.appIds = appIds;
+ public TaskExecuteRunningCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
+ super(messageSenderAddress, messageReceiverAddress, messageSendTime);
}
/**
@@ -158,18 +97,4 @@ public class TaskExecuteRunningCommand implements Serializable {
return command;
}
- @Override
- public String toString() {
- return "TaskExecuteRunningCommand{"
- + "taskInstanceId=" + taskInstanceId
- + ", processInstanceId='" + processInstanceId + '\''
- + ", startTime=" + startTime
- + ", host='" + host + '\''
- + ", status=" + status
- + ", logPath='" + logPath + '\''
- + ", executePath='" + executePath + '\''
- + ", processId=" + processId + '\''
- + ", appIds='" + appIds + '\''
- + '}';
- }
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java
similarity index 63%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java
index 2221a6c09d..aed647bb4c 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java
@@ -19,41 +19,30 @@ package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import java.io.Serializable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
-/**
- * task recall ack command
- */
-public class TaskRecallAckCommand implements Serializable {
+@Data
+@NoArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskRejectAckCommand extends BaseCommand {
private int taskInstanceId;
private int status;
- public TaskRecallAckCommand() {
- super();
- }
-
- public TaskRecallAckCommand(int status, int taskInstanceId) {
+ public TaskRejectAckCommand(int status,
+ int taskInstanceId,
+ String messageSenderAddress,
+ String messageReceiverAddress,
+ long messageSendTime) {
+ super(messageSenderAddress, messageReceiverAddress, messageSendTime);
this.status = status;
this.taskInstanceId = taskInstanceId;
}
- public int getTaskInstanceId() {
- return taskInstanceId;
- }
-
- public void setTaskInstanceId(int taskInstanceId) {
- this.taskInstanceId = taskInstanceId;
- }
-
- public int getStatus() {
- return status;
- }
-
- public void setStatus(int status) {
- this.status = status;
- }
-
/**
* package response command
*
@@ -61,14 +50,10 @@ public class TaskRecallAckCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.TASK_RECALL_ACK);
+ command.setType(CommandType.TASK_REJECT_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
- @Override
- public String toString() {
- return "TaskRecallAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
- }
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java
similarity index 60%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java
index 0cd81dc3e2..a9ce5c56a7 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java
@@ -19,12 +19,19 @@ package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import java.io.Serializable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
/**
- * kill task recall command
+ * Task reject message, means the task has been rejected by the worker.
*/
-public class TaskRecallCommand implements Serializable {
+@Data
+@NoArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class TaskRejectCommand extends BaseCommand {
/**
* taskInstanceId
@@ -41,28 +48,8 @@ public class TaskRecallCommand implements Serializable {
*/
private int processInstanceId;
- public int getTaskInstanceId() {
- return taskInstanceId;
- }
-
- public void setTaskInstanceId(int taskInstanceId) {
- this.taskInstanceId = taskInstanceId;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public int getProcessInstanceId() {
- return processInstanceId;
- }
-
- public void setProcessInstanceId(int processInstanceId) {
- this.processInstanceId = processInstanceId;
+ public TaskRejectCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
+ super(messageSenderAddress, messageReceiverAddress, messageSendTime);
}
/**
@@ -72,18 +59,9 @@ public class TaskRecallCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.TASK_RECALL);
+ command.setType(CommandType.TASK_REJECT);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
-
- @Override
- public String toString() {
- return "TaskRecallCommand{"
- + "taskInstanceId=" + taskInstanceId
- + ", host='" + host + '\''
- + ", processInstanceId=" + processInstanceId
- + '}';
- }
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index dc8e1f0d36..edbaea13ce 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.remote.utils;
+import lombok.NonNull;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import java.io.Serializable;
@@ -95,7 +96,7 @@ public class Host implements Serializable {
* @param address address
* @return host
*/
- public static Host of(String address) {
+ public static Host of(@NonNull String address) {
String[] parts = splitAddress(address);
return new Host(parts[0], Integer.parseInt(parts[1]));
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index a8a401cde7..d97a994249 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -25,9 +25,10 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
-import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -79,9 +80,6 @@ public class WorkerServer implements IStoppable {
@Autowired
private AlertClientService alertClientService;
- @Autowired
- private RetryReportTaskStatusThread retryReportTaskStatusThread;
-
@Autowired
private WorkerManagerThread workerManagerThread;
@@ -99,6 +97,12 @@ public class WorkerServer implements IStoppable {
@Autowired
private WorkerRpcServer workerRpcServer;
+ @Autowired
+ private WorkerRpcClient workerRpcClient;
+
+ @Autowired
+ private MessageRetryRunner messageRetryRunner;
+
/**
* worker server startup, not use web service
*
@@ -112,7 +116,7 @@ public class WorkerServer implements IStoppable {
@PostConstruct
public void run() {
this.workerRpcServer.start();
-
+ this.workerRpcClient.start();
this.taskPluginManager.installPlugin();
this.workerRegistryClient.registry();
@@ -122,7 +126,7 @@ public class WorkerServer implements IStoppable {
this.workerManagerThread.start();
- this.retryReportTaskStatusThread.start();
+ this.messageRetryRunner.start();
/*
* registry hooks, which are called before the process exits
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
deleted file mode 100644
index f8e47fc43a..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.cache;
-
-import org.apache.dolphinscheduler.common.enums.TaskEventType;
-import org.apache.dolphinscheduler.remote.command.Command;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Response Cache : cache worker send master result
- */
-public class ResponseCache {
-
- private static final ResponseCache instance = new ResponseCache();
-
- private ResponseCache() {
- }
-
- public static ResponseCache get() {
- return instance;
- }
-
- private final Map<Integer, Command> runningCache = new ConcurrentHashMap<>();
- private final Map<Integer, Command> responseCache = new ConcurrentHashMap<>();
- private final Map<Integer,Command> recallCache = new ConcurrentHashMap<>();
-
- /**
- * cache response
- *
- * @param taskInstanceId taskInstanceId
- * @param command command
- * @param event event ACK/RESULT
- */
- public void cache(Integer taskInstanceId, Command command, TaskEventType event) {
- switch (event) {
- case RUNNING:
- runningCache.put(taskInstanceId, command);
- break;
- case RESULT:
- responseCache.put(taskInstanceId, command);
- break;
- case WORKER_REJECT:
- recallCache.put(taskInstanceId, command);
- break;
- default:
- throw new IllegalArgumentException("invalid event type : " + event);
- }
- }
-
- /**
- * recall response cache
- *
- * @param taskInstanceId taskInstanceId
- */
- public void removeRecallCache(Integer taskInstanceId) {
- recallCache.remove(taskInstanceId);
- }
-
- public Map<Integer, Command> getRecallCache() {
- return recallCache;
- }
-
- /**
- * remove running cache
- *
- * @param taskInstanceId taskInstanceId
- */
- public void removeRunningCache(Integer taskInstanceId) {
- runningCache.remove(taskInstanceId);
- }
-
- /**
- * remove response cache
- *
- * @param taskInstanceId taskInstanceId
- */
- public void removeResponseCache(Integer taskInstanceId) {
- responseCache.remove(taskInstanceId);
- }
-
- /**
- * get running cache
- *
- * @return getAckCache
- */
- public Map<Integer, Command> getRunningCache() {
- return runningCache;
- }
-
- /**
- * getResponseCache
- *
- * @return getResponseCache
- */
- public Map<Integer, Command> getResponseCache() {
- return responseCache;
- }
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index a641b28e1b..aab162d6d4 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.worker.config;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+
import java.time.Duration;
import java.util.Set;
@@ -46,6 +48,10 @@ public class WorkerConfig implements Validator {
private Set<String> groups = Sets.newHashSet("default");
private String alertListenHost = "localhost";
private int alertListenPort = 50052;
+ /**
+ * This field doesn't need to set at config file, it will be calculated by workerIp:listenPort
+ */
+ private String workerAddress;
@Override
public boolean supports(Class<?> clazz) {
@@ -64,6 +70,6 @@ public class WorkerConfig implements Validator {
if (workerConfig.getMaxCpuLoadAvg() <= 0) {
workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
-
+ workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
new file mode 100644
index 0000000000..18dceb069b
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.message;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.remote.command.BaseCommand;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+
+import lombok.NonNull;
+
+@Component
+public class MessageRetryRunner extends BaseDaemonThread {
+
+ private final Logger logger = LoggerFactory.getLogger(MessageRetryRunner.class);
+
+ protected MessageRetryRunner() {
+ super("WorkerMessageRetryRunnerThread");
+ }
+
+ private static long MESSAGE_RETRY_WINDOW = Duration.ofMinutes(5L).toMillis();
+
+ @Autowired
+ private ApplicationContext applicationContext;
+
+ private Map<CommandType, MessageSender<BaseCommand>> messageSenderMap = new HashMap<>();
+
+ private Map<Integer, Map<CommandType, BaseCommand>> needToRetryMessages = new ConcurrentHashMap<>();
+
+ @PostConstruct
+ public void init() {
+ Map<String, MessageSender> messageSenders = applicationContext.getBeansOfType(MessageSender.class);
+ messageSenders.values().forEach(messageSender -> {
+ messageSenderMap.put(messageSender.getMessageType(), messageSender);
+ logger.info("Injected message sender: {}", messageSender.getClass().getName());
+ });
+ }
+
+ @Override
+ public synchronized void start() {
+ logger.info("Message retry runner staring");
+ super.start();
+ logger.info("Message retry runner started");
+ }
+
+ public void addRetryMessage(int taskInstanceId, @NonNull CommandType messageType, BaseCommand baseCommand) {
+ needToRetryMessages.computeIfAbsent(taskInstanceId, k -> new ConcurrentHashMap<>()).put(messageType,
+ baseCommand);
+ }
+
+ public void removeRetryMessage(int taskInstanceId, @NonNull CommandType messageType) {
+ Map<CommandType, BaseCommand> retryMessages = needToRetryMessages.get(taskInstanceId);
+ if (retryMessages != null) {
+ retryMessages.remove(messageType);
+ }
+ }
+
+ public void removeRetryMessages(int taskInstanceId) {
+ needToRetryMessages.remove(taskInstanceId);
+ }
+
+ public void updateMessageHost(int taskInstanceId, String messageReceiverHost) {
+ Map<CommandType, BaseCommand> needToRetryMessages = this.needToRetryMessages.get(taskInstanceId);
+ if (needToRetryMessages != null) {
+ needToRetryMessages.values().forEach(baseMessage -> {
+ baseMessage.setMessageReceiverAddress(messageReceiverHost);
+ });
+ }
+ }
+
+ public void run() {
+ while (Stopper.isRunning()) {
+ try {
+ if (needToRetryMessages.isEmpty()) {
+ Thread.sleep(MESSAGE_RETRY_WINDOW);
+ }
+
+ long now = System.currentTimeMillis();
+ for (Map.Entry<Integer, Map<CommandType, BaseCommand>> taskEntry : needToRetryMessages.entrySet()) {
+ Integer taskInstanceId = taskEntry.getKey();
+ LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
+ try {
+ for (Map.Entry<CommandType, BaseCommand> messageEntry : taskEntry.getValue().entrySet()) {
+ CommandType messageType = messageEntry.getKey();
+ BaseCommand message = messageEntry.getValue();
+ if (now - message.getMessageSendTime() > MESSAGE_RETRY_WINDOW) {
+ logger.info("Begin retry send message to master, message: {}", message);
+ message.setMessageSendTime(now);
+ messageSenderMap.get(messageType).sendMessage(message);
+ logger.info("Success send message to master, message: {}", message);
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Retry send message to master error", e);
+ } finally {
+ LoggerUtils.removeTaskInstanceIdMDC();
+ }
+ }
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+ } catch (InterruptedException instance) {
+ logger.warn("The message retry thread is interrupted, will break this loop", instance);
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Exception ex) {
+ logger.error("Retry send message failed, get an known exception.", ex);
+ }
+ }
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java
similarity index 53%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java
index 000e3f4e02..519c5b2b3e 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java
@@ -15,32 +15,29 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.remote.command;
+package org.apache.dolphinscheduler.server.worker.message;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-
-import java.io.Serializable;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
-public class TaskExecuteRequestCommand implements Serializable {
-
- private static final long serialVersionUID = -1L;
-
- private TaskExecutionContext taskExecutionContext;
-
- public Command convert2Command() {
- Command command = new Command();
- command.setType(CommandType.TASK_EXECUTE_REQUEST);
- byte[] body = JSONUtils.toJsonByteArray(this);
- command.setBody(body);
- return command;
- }
-
+import org.apache.dolphinscheduler.remote.command.BaseCommand;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+
+public interface MessageSender<T extends BaseCommand> {
+
+ /**
+ * Send the message
+ *
+ * @throws RemotingException Cannot connect to the target host.
+ */
+ void sendMessage(T message) throws RemotingException;
+
+ /**
+ * Build the message from task context and message received address.
+ */
+ T buildMessage(TaskExecutionContext taskExecutionContext, String messageReceiverAddress);
+
+ /**
+ * The message type can be sent by this sender.
+ */
+ CommandType getMessageType();
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteResultMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteResultMessageSender.java
new file mode 100644
index 0000000000..a7d05a4791
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteResultMessageSender.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.message;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskExecuteResultMessageSender implements MessageSender<TaskExecuteResultCommand> {
+
+ @Autowired
+ private WorkerConfig workerConfig;
+
+ @Autowired
+ private WorkerRpcClient workerRpcClient;
+
+ @Override
+ public void sendMessage(TaskExecuteResultCommand message) throws RemotingException {
+ workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command());
+ }
+
+ public TaskExecuteResultCommand buildMessage(TaskExecutionContext taskExecutionContext,
+ String messageReceiverAddress) {
+ TaskExecuteResultCommand taskExecuteResultMessage
+ = new TaskExecuteResultCommand(workerConfig.getWorkerAddress(),
+ messageReceiverAddress,
+ System.currentTimeMillis());
+ taskExecuteResultMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+ taskExecuteResultMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ taskExecuteResultMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
+ taskExecuteResultMessage.setLogPath(taskExecutionContext.getLogPath());
+ taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath());
+ taskExecuteResultMessage.setAppIds(taskExecutionContext.getAppIds());
+ taskExecuteResultMessage.setProcessId(taskExecutionContext.getProcessId());
+ taskExecuteResultMessage.setHost(taskExecutionContext.getHost());
+ taskExecuteResultMessage.setStartTime(taskExecutionContext.getStartTime());
+ taskExecuteResultMessage.setEndTime(taskExecutionContext.getEndTime());
+ taskExecuteResultMessage.setVarPool(taskExecutionContext.getVarPool());
+ taskExecuteResultMessage.setExecutePath(taskExecutionContext.getExecutePath());
+ return taskExecuteResultMessage;
+ }
+
+ @Override
+ public CommandType getMessageType() {
+ return CommandType.TASK_EXECUTE_RESULT;
+ }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java
new file mode 100644
index 0000000000..7891f4be70
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.message;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import lombok.NonNull;
+
+@Component
+public class TaskExecuteRunningMessageSender implements MessageSender<TaskExecuteRunningCommand> {
+
+ @Autowired
+ private WorkerRpcClient workerRpcClient;
+
+ @Autowired
+ private WorkerConfig workerConfig;
+
+ @Override
+ public void sendMessage(TaskExecuteRunningCommand message) throws RemotingException {
+ workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command());
+ }
+
+ public TaskExecuteRunningCommand buildMessage(@NonNull TaskExecutionContext taskExecutionContext,
+ @NonNull String messageReceiverAddress) {
+ TaskExecuteRunningCommand taskExecuteRunningMessage
+ = new TaskExecuteRunningCommand(workerConfig.getWorkerAddress(),
+ messageReceiverAddress,
+ System.currentTimeMillis());
+ taskExecuteRunningMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ taskExecuteRunningMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+ taskExecuteRunningMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
+ taskExecuteRunningMessage.setLogPath(taskExecutionContext.getLogPath());
+ taskExecuteRunningMessage.setHost(taskExecutionContext.getHost());
+ taskExecuteRunningMessage.setStartTime(taskExecutionContext.getStartTime());
+ taskExecuteRunningMessage.setExecutePath(taskExecutionContext.getExecutePath());
+ return taskExecuteRunningMessage;
+ }
+
+ @Override
+ public CommandType getMessageType() {
+ return CommandType.TASK_EXECUTE_RUNNING;
+ }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java
new file mode 100644
index 0000000000..d50c5d8997
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.message;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskRejectMessageSender implements MessageSender<TaskRejectCommand> {
+
+ @Autowired
+ private WorkerRpcClient workerRpcClient;
+
+ @Autowired
+ private WorkerConfig workerConfig;
+
+ @Override
+ public void sendMessage(TaskRejectCommand message) throws RemotingException {
+ workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command());
+ }
+
+ public TaskRejectCommand buildMessage(TaskExecutionContext taskExecutionContext, String masterAddress) {
+ TaskRejectCommand taskRejectMessage = new TaskRejectCommand(workerConfig.getWorkerAddress(),
+ masterAddress,
+ System.currentTimeMillis());
+ taskRejectMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ taskRejectMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+ taskRejectMessage.setHost(taskExecutionContext.getHost());
+ return taskRejectMessage;
+ }
+
+ @Override
+ public CommandType getMessageType() {
+ return CommandType.TASK_REJECT;
+ }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
index d1ceca6c9e..d36c80012d 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
@@ -21,8 +21,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
-import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,22 +42,19 @@ public class HostUpdateProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(HostUpdateProcessor.class);
- /**
- * task callback service
- */
@Autowired
- private TaskCallbackService taskCallbackService;
+ private MessageRetryRunner messageRetryRunner;
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
+ Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST == command.getType(),
+ String.format("invalid command type : %s", command.getType()));
HostUpdateCommand updateCommand = JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class);
if (updateCommand == null) {
logger.error("host update command is null");
return;
}
logger.info("received host update command : {}", updateCommand);
- taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
-
+ messageRetryRunner.updateMessageHost(updateCommand.getTaskInstanceId(), updateCommand.getProcessHost());
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
deleted file mode 100644
index 21616a1f60..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.processor;
-
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-
-import org.apache.dolphinscheduler.common.enums.TaskEventType;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
-import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
-import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
-
-import java.util.Arrays;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-
-
-/**
- * task callback service
- */
-@Service
-public class TaskCallbackService {
-
- private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
- private static final int[] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200};
-
- @Autowired
- private TaskExecuteRunningAckProcessor taskExecuteRunningProcessor;
-
- @Autowired
- private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
-
- /**
- * remote channels
- */
- private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();
-
- /**
- * netty remoting client
- */
- private final NettyRemotingClient nettyRemotingClient;
-
- public TaskCallbackService() {
- final NettyClientConfig clientConfig = new NettyClientConfig();
- this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
- this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
- this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
- }
-
- /**
- * add callback channel
- *
- * @param taskInstanceId taskInstanceId
- * @param channel channel
- */
- public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
- REMOTE_CHANNELS.put(taskInstanceId, channel);
- }
-
- /**
- * change remote channel
- */
- public void changeRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
- REMOTE_CHANNELS.put(taskInstanceId, channel);
- }
-
- /**
- * get callback channel
- *
- * @param taskInstanceId taskInstanceId
- * @return callback channel
- */
- private Optional<NettyRemoteChannel> getRemoteChannel(int taskInstanceId) {
- Channel newChannel;
- NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
- if (nettyRemoteChannel != null) {
- if (nettyRemoteChannel.isActive()) {
- return Optional.of(nettyRemoteChannel);
- }
- newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
- if (newChannel != null) {
- return Optional.of(getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId));
- }
- }
- return Optional.empty();
- }
-
- public long pause(int ntries) {
- return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length];
- }
-
- private NettyRemoteChannel getRemoteChannel(Channel newChannel, long opaque, int taskInstanceId) {
- NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque);
- addRemoteChannel(taskInstanceId, remoteChannel);
- return remoteChannel;
- }
-
- private NettyRemoteChannel getRemoteChannel(Channel newChannel, int taskInstanceId) {
- NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel);
- addRemoteChannel(taskInstanceId, remoteChannel);
- return remoteChannel;
- }
-
- /**
- * remove callback channels
- *
- * @param taskInstanceId taskInstanceId
- */
- public static void remove(int taskInstanceId) {
- REMOTE_CHANNELS.remove(taskInstanceId);
- }
-
- /**
- * send result
- *
- * @param taskInstanceId taskInstanceId
- * @param command command
- */
- public void send(int taskInstanceId, Command command) {
- Optional<NettyRemoteChannel> nettyRemoteChannel = getRemoteChannel(taskInstanceId);
- if (nettyRemoteChannel.isPresent()) {
- nettyRemoteChannel.get().writeAndFlush(command).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- if (!future.isSuccess()) {
- logger.error("Send callback command error, taskInstanceId: {}, command: {}", taskInstanceId, command);
- }
- }
- });
- } else {
- logger.warn("Remote channel of taskInstanceId is null: {}, cannot send command: {}", taskInstanceId, command);
- }
- }
-
- /**
- * build task execute running command
- *
- * @param taskExecutionContext taskExecutionContext
- * @return TaskExecuteAckCommand
- */
- private TaskExecuteRunningCommand buildTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
- TaskExecuteRunningCommand command = new TaskExecuteRunningCommand();
- command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
- command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
- command.setLogPath(taskExecutionContext.getLogPath());
- command.setHost(taskExecutionContext.getHost());
- command.setStartTime(taskExecutionContext.getStartTime());
- command.setExecutePath(taskExecutionContext.getExecutePath());
- return command;
- }
-
- /**
- * build task execute response command
- *
- * @param taskExecutionContext taskExecutionContext
- * @return TaskExecuteResponseCommand
- */
- private TaskExecuteResponseCommand buildTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
- TaskExecuteResponseCommand command = new TaskExecuteResponseCommand();
- command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
- command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
- command.setLogPath(taskExecutionContext.getLogPath());
- command.setExecutePath(taskExecutionContext.getExecutePath());
- command.setAppIds(taskExecutionContext.getAppIds());
- command.setProcessId(taskExecutionContext.getProcessId());
- command.setHost(taskExecutionContext.getHost());
- command.setStartTime(taskExecutionContext.getStartTime());
- command.setEndTime(taskExecutionContext.getEndTime());
- command.setVarPool(taskExecutionContext.getVarPool());
- command.setExecutePath(taskExecutionContext.getExecutePath());
- return command;
- }
-
- /**
- * build TaskKillResponseCommand
- *
- * @param taskExecutionContext taskExecutionContext
- * @return build TaskKillResponseCommand
- */
- private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext) {
- TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
- taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
- taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
- taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- taskKillResponseCommand.setHost(taskExecutionContext.getHost());
- taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
- return taskKillResponseCommand;
- }
-
- private TaskRecallCommand buildRecallCommand(TaskExecutionContext taskExecutionContext) {
- TaskRecallCommand taskRecallCommand = new TaskRecallCommand();
- taskRecallCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- taskRecallCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
- taskRecallCommand.setHost(taskExecutionContext.getHost());
- return taskRecallCommand;
- }
-
- /**
- * send task execute running command
- * todo unified callback command
- */
- public void sendTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
- TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
- // add response cache
- ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), TaskEventType.RUNNING);
- send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
- }
-
- /**
- * send task execute delay command
- * todo unified callback command
- */
- public void sendTaskExecuteDelayCommand(TaskExecutionContext taskExecutionContext) {
- TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
- send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
- }
-
- /**
- * send task execute response command
- * todo unified callback command
- */
- public void sendTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
- TaskExecuteResponseCommand command = buildTaskExecuteResponseCommand(taskExecutionContext);
- // add response cache
- ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), TaskEventType.RESULT);
- send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
- }
-
- public void sendTaskKillResponseCommand(TaskExecutionContext taskExecutionContext) {
- TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext);
- send(taskExecutionContext.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
- }
-
- /**
- * send task execute response command
- */
- public void sendRecallCommand(TaskExecutionContext taskExecutionContext) {
- TaskRecallCommand taskRecallCommand = buildRecallCommand(taskExecutionContext);
- ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command(), TaskEventType.WORKER_REJECT);
- send(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command());
- }
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
similarity index 69%
rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
index c14a2e891e..a9ce868c2d 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
@@ -24,19 +24,18 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
-import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
+import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
@@ -58,16 +57,13 @@ import io.micrometer.core.annotation.Timed;
import io.netty.channel.Channel;
/**
- * worker request processor
+ * Used to handle {@link CommandType#TASK_DISPATCH_REQUEST}
*/
@Component
-public class TaskExecuteProcessor implements NettyRequestProcessor {
+public class TaskDispatchProcessor implements NettyRequestProcessor {
- private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
+ private static final Logger logger = LoggerFactory.getLogger(TaskDispatchProcessor.class);
- /**
- * worker config
- */
@Autowired
private WorkerConfig workerConfig;
@@ -75,7 +71,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
* task callback service
*/
@Autowired
- private TaskCallbackService taskCallbackService;
+ private WorkerMessageSender workerMessageSender;
/**
* alert client service
@@ -99,26 +95,27 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
@Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
- String.format("invalid command type : %s", command.getType()));
+ Preconditions.checkArgument(CommandType.TASK_DISPATCH_REQUEST == command.getType(),
+ String.format("invalid command type : %s", command.getType()));
- TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(
- command.getBody(), TaskExecuteRequestCommand.class);
+ TaskDispatchCommand taskDispatchCommand = JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class);
- if (taskRequestCommand == null) {
- logger.error("task execute request command is null");
+ if (taskDispatchCommand == null) {
+ logger.error("task execute request command content is null");
return;
}
- logger.info("task execute request command : {}", taskRequestCommand);
+ final String masterAddress = taskDispatchCommand.getMessageSenderAddress();
+ logger.info("task execute request message: {}", taskDispatchCommand);
- TaskExecutionContext taskExecutionContext = taskRequestCommand.getTaskExecutionContext();
+ TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();
if (taskExecutionContext == null) {
logger.error("task execution context is null");
return;
}
try {
- LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId());
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
@@ -127,7 +124,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
// todo custom logger
- taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
+ taskExecutionContext.setHost(workerConfig.getWorkerAddress());
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
@@ -148,11 +145,14 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
// check if the OS user exists
if (!osUserExistFlag) {
logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
- taskExecutionContext.getTenantCode(), taskExecutionContext.getTaskInstanceId());
+ taskExecutionContext.getTenantCode(),
+ taskExecutionContext.getTaskInstanceId());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(new Date());
- taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+ workerMessageSender.sendMessageWithRetry(taskExecutionContext,
+ masterAddress,
+ CommandType.TASK_EXECUTE_RESULT);
return;
}
@@ -164,33 +164,43 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
} catch (Throwable ex) {
- logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", execLocalPath, taskExecutionContext.getTaskInstanceId());
- logger.error("create executeLocalPath fail", ex);
+ logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
+ execLocalPath,
+ taskExecutionContext.getTaskInstanceId(),
+ ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
- taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+ workerMessageSender.sendMessageWithRetry(taskExecutionContext,
+ masterAddress,
+ CommandType.TASK_EXECUTE_RESULT);
return;
}
}
- taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
-
// delay task process
- long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
+ long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
+ taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
- logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
+ logger.info("delay the execution of task instance {}, delay time: {} s",
+ taskExecutionContext.getTaskInstanceId(),
+ remainTime);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
- taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
+ workerMessageSender.sendMessage(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
}
// submit task to manager
- boolean offer = workerManager.offer(
- new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager, storageOperate));
+ boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext,
+ masterAddress,
+ workerMessageSender,
+ alertClientService,
+ taskPluginManager,
+ storageOperate));
if (!offer) {
logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}",
- workerManager.getWaitSubmitQueueSize(), taskExecutionContext.getTaskInstanceId());
- taskCallbackService.sendRecallCommand(taskExecutionContext);
+ workerManager.getWaitSubmitQueueSize(),
+ taskExecutionContext.getTaskInstanceId());
+ workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_REJECT);
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
deleted file mode 100644
index b62770984a..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.processor;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
-import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-import com.google.common.base.Preconditions;
-
-import io.netty.channel.Channel;
-
-/**
- * task execute running ack, from master to worker
- */
-@Component
-public class TaskExecuteResponseAckProcessor implements NettyRequestProcessor {
-
- private final Logger logger = LoggerFactory.getLogger(TaskExecuteResponseAckProcessor.class);
-
- @Override
- public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE_ACK == command.getType(),
- String.format("invalid command type : %s", command.getType()));
-
- TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = JSONUtils.parseObject(
- command.getBody(), TaskExecuteResponseAckCommand.class);
-
- if (taskExecuteResponseAckCommand == null) {
- logger.error("task execute response ack command is null");
- return;
- }
- logger.info("task execute response ack command : {}", taskExecuteResponseAckCommand);
-
- if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
- ResponseCache.get().removeResponseCache(taskExecuteResponseAckCommand.getTaskInstanceId());
- TaskCallbackService.remove(taskExecuteResponseAckCommand.getTaskInstanceId());
- logger.debug("remove REMOTE_CHANNELS, task instance id:{}",
- taskExecuteResponseAckCommand.getTaskInstanceId());
- } else if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.FAILURE.getCode()) {
- // master handle worker response error, will still retry
- } else {
- throw new IllegalArgumentException("Invalid task execute response ack status: "
- + taskExecuteResponseAckCommand.getStatus());
- }
- }
-
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
new file mode 100644
index 0000000000..c60c283c7f
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+
+/**
+ * task execute running ack, from master to worker
+ */
+@Component
+public class TaskExecuteResultAckProcessor implements NettyRequestProcessor {
+
+ private final Logger logger = LoggerFactory.getLogger(TaskExecuteResultAckProcessor.class);
+
+ @Autowired
+ private MessageRetryRunner messageRetryRunner;
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT_ACK == command.getType(),
+ String.format("invalid command type : %s", command.getType()));
+
+ TaskExecuteAckCommand taskExecuteAckMessage = JSONUtils.parseObject(command.getBody(),
+ TaskExecuteAckCommand.class);
+
+ if (taskExecuteAckMessage == null) {
+ logger.error("task execute response ack command is null");
+ return;
+ }
+ logger.info("task execute response ack command : {}", taskExecuteAckMessage);
+
+ try {
+ LoggerUtils.setTaskInstanceIdMDC(taskExecuteAckMessage.getTaskInstanceId());
+ if (taskExecuteAckMessage.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
+ messageRetryRunner.removeRetryMessage(taskExecuteAckMessage.getTaskInstanceId(),
+ CommandType.TASK_EXECUTE_RESULT);
+ logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskExecuteAckMessage.getTaskInstanceId());
+ } else if (taskExecuteAckMessage.getStatus() == ExecutionStatus.FAILURE.getCode()) {
+ // master handle worker response error, will still retry
+ logger.error("Receive task execute result ack message, the message status is not success, message: {}",
+ taskExecuteAckMessage);
+ } else {
+ throw new IllegalArgumentException("Invalid task execute response ack status: "
+ + taskExecuteAckMessage.getStatus());
+ }
+ } finally {
+ LoggerUtils.removeTaskInstanceIdMDC();
+
+ }
+ }
+
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
index 421e92a530..b85db0cb64 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
@@ -22,12 +22,13 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@@ -42,13 +43,16 @@ public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskExecuteRunningAckProcessor.class);
+ @Autowired
+ private MessageRetryRunner messageRetryRunner;
+
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING_ACK == command.getType(),
- String.format("invalid command type : %s", command.getType()));
+ String.format("invalid command type : %s", command.getType()));
- TaskExecuteRunningAckCommand runningAckCommand = JSONUtils.parseObject(
- command.getBody(), TaskExecuteRunningAckCommand.class);
+ TaskExecuteRunningAckMessage runningAckCommand = JSONUtils.parseObject(command.getBody(),
+ TaskExecuteRunningAckMessage.class);
if (runningAckCommand == null) {
logger.error("task execute running ack command is null");
return;
@@ -58,7 +62,8 @@ public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
logger.info("task execute running ack command : {}", runningAckCommand);
if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
- ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId());
+ messageRetryRunner.removeRetryMessage(runningAckCommand.getTaskInstanceId(),
+ CommandType.TASK_EXECUTE_RUNNING);
}
} finally {
LoggerUtils.removeTaskInstanceIdMDC();
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index ecbbe78128..7e18fbbbef 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -28,15 +28,17 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
-import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
+import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.log.LogClientService;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -47,7 +49,10 @@ import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
/**
* task kill processor
@@ -57,18 +62,15 @@ public class TaskKillProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class);
- /**
- * task callback service
- */
- @Autowired
- private TaskCallbackService taskCallbackService;
-
/**
* task execute manager
*/
@Autowired
private WorkerManagerThread workerManager;
+ @Autowired
+ private MessageRetryRunner messageRetryRunner;
+
/**
* task kill process
*
@@ -77,7 +79,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
+ Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(),
+ String.format("invalid command type : %s", command.getType()));
TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
if (killCommand == null) {
logger.error("task kill request command is null");
@@ -86,7 +89,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
logger.info("task kill command : {}", killCommand);
int taskInstanceId = killCommand.getTaskInstanceId();
- TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
+ TaskExecutionContext taskExecutionContext
+ = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
return;
@@ -96,25 +100,43 @@ public class TaskKillProcessor implements NettyRequestProcessor {
if (processId == 0) {
this.cancelApplication(taskInstanceId);
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.KILL);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+ sendTaskKillResponseCommand(channel, taskExecutionContext);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
return;
}
Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
- taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
- new NettyRemoteChannel(channel, command.getOpaque()));
-
- taskExecutionContext.setCurrentExecutionStatus(result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE);
+ taskExecutionContext.setCurrentExecutionStatus(
+ result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE);
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
+ sendTaskKillResponseCommand(channel, taskExecutionContext);
- taskCallbackService.sendTaskKillResponseCommand(taskExecutionContext);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
}
+ private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
+ TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
+ taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
+ taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
+ taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ taskKillResponseCommand.setHost(taskExecutionContext.getHost());
+ taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
+ channel.writeAndFlush(taskKillResponseCommand.convert2Command()).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ logger.error("Submit kill response to master error, kill command: {}", taskKillResponseCommand);
+ }
+ }
+ });
+ }
+
/**
* do kill
*
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
similarity index 50%
rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java
rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
index 769024e0aa..850630efe4 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
@@ -18,15 +18,17 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@@ -34,25 +36,35 @@ import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
@Component
-public class TaskRecallAckProcessor implements NettyRequestProcessor {
+public class TaskRejectAckProcessor implements NettyRequestProcessor {
- private final Logger logger = LoggerFactory.getLogger(TaskRecallAckProcessor.class);
+ private final Logger logger = LoggerFactory.getLogger(TaskRejectAckProcessor.class);
+
+ @Autowired
+ private MessageRetryRunner messageRetryRunner;
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.TASK_RECALL_ACK == command.getType(),
- String.format("invalid command type : %s", command.getType()));
+ Preconditions.checkArgument(CommandType.TASK_REJECT_ACK == command.getType(),
+ String.format("invalid command type : %s", command.getType()));
- TaskRecallAckCommand taskRecallAckCommand = JSONUtils.parseObject(command.getBody(), TaskRecallAckCommand.class);
- if (taskRecallAckCommand == null) {
+ TaskRejectAckCommand taskRejectAckMessage = JSONUtils.parseObject(command.getBody(),
+ TaskRejectAckCommand.class);
+ if (taskRejectAckMessage == null) {
return;
}
-
- if (taskRecallAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
- ResponseCache.get().removeRecallCache(taskRecallAckCommand.getTaskInstanceId());
- logger.debug("removeRecallCache: task instance id:{}", taskRecallAckCommand.getTaskInstanceId());
- TaskCallbackService.remove(taskRecallAckCommand.getTaskInstanceId());
- logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskRecallAckCommand.getTaskInstanceId());
+ try {
+ LoggerUtils.setTaskInstanceIdMDC(taskRejectAckMessage.getTaskInstanceId());
+ if (taskRejectAckMessage.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
+ messageRetryRunner.removeRetryMessage(taskRejectAckMessage.getTaskInstanceId(),
+ CommandType.TASK_REJECT);
+ logger.debug("removeRecallCache: task instance id:{}", taskRejectAckMessage.getTaskInstanceId());
+ } else {
+ logger.error("Receive task reject ack message, the message status is not success, message: {}",
+ taskRejectAckMessage);
+ }
+ } finally {
+ LoggerUtils.removeTaskInstanceIdMDC();
}
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java
new file mode 100644
index 0000000000..5a2752ca34
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.rpc;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.BaseCommand;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+import org.apache.dolphinscheduler.server.worker.message.MessageSender;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+
+import lombok.NonNull;
+
+@Component
+public class WorkerMessageSender {
+
+ private final Logger logger = LoggerFactory.getLogger(WorkerMessageSender.class);
+
+ @Autowired
+ private MessageRetryRunner messageRetryRunner;
+
+ @Autowired
+ private ApplicationContext applicationContext;
+
+ private Map<CommandType, MessageSender> messageSenderMap = new HashMap<>();
+
+ @PostConstruct
+ public void init() {
+ Map<String, MessageSender> messageSenders = applicationContext.getBeansOfType(MessageSender.class);
+ messageSenders.values().forEach(messageSender -> messageSenderMap.put(messageSender.getMessageType(),
+ messageSender));
+ }
+
+ // todo: use message rather than context
+ public void sendMessageWithRetry(@NonNull TaskExecutionContext taskExecutionContext,
+ @NonNull String messageReceiverAddress,
+ @NonNull CommandType messageType) {
+ MessageSender messageSender = messageSenderMap.get(messageType);
+ if (messageSender == null) {
+ throw new IllegalArgumentException("The messageType is invalidated, messageType: " + messageType);
+ }
+ BaseCommand baseCommand = messageSender.buildMessage(taskExecutionContext, messageReceiverAddress);
+ try {
+ messageRetryRunner.addRetryMessage(taskExecutionContext.getTaskInstanceId(), messageType, baseCommand);
+ messageSender.sendMessage(baseCommand);
+ } catch (RemotingException e) {
+ logger.error("Send message error, messageType: {}, message: {}", messageType, baseCommand);
+ }
+ }
+
+ public void sendMessage(@NonNull TaskExecutionContext taskExecutionContext,
+ @NonNull String messageReceiverAddress,
+ @NonNull CommandType messageType) {
+ MessageSender messageSender = messageSenderMap.get(messageType);
+ if (messageSender == null) {
+ throw new IllegalArgumentException("The messageType is invalidated, messageType: " + messageType);
+ }
+ BaseCommand baseCommand = messageSender.buildMessage(taskExecutionContext, messageReceiverAddress);
+ try {
+ messageSender.sendMessage(baseCommand);
+ } catch (RemotingException e) {
+ logger.error("Send message error, messageType: {}, message: {}", messageType, baseCommand);
+ }
+ }
+
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
new file mode 100644
index 0000000000..4be20873f9
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.rpc;
+
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * This rpc client is only used to send message, will not receive message, all response message should send to {@link WorkerRpcServer}.
+ */
+@Component
+public class WorkerRpcClient implements AutoCloseable {
+
+ private final Logger logger = LoggerFactory.getLogger(WorkerRpcClient.class);
+
+ @Autowired
+ private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
+
+ @Autowired
+ private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor;
+
+ @Autowired
+ private TaskRejectAckProcessor taskRejectAckProcessor;
+
+ private NettyRemotingClient nettyRemotingClient;
+
+ public void start() {
+ logger.info("Worker rpc client starting");
+ NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ this.nettyRemotingClient = new NettyRemotingClient(nettyClientConfig);
+ // we only use the client to handle the ack message, we can optimize this, send ack to the nettyServer.
+ this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,
+ taskExecuteRunningAckProcessor);
+ this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
+ this.nettyRemotingClient.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
+ logger.info("Worker rpc client started");
+ }
+
+ public void send(Host host, Command command) throws RemotingException {
+ nettyRemotingClient.send(host, command);
+ }
+
+ public void close() {
+ logger.info("Worker rpc client closing");
+ nettyRemotingClient.close();
+ logger.info("Worker rpc client closed");
+ }
+}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
index be2ccc0ab4..e599046c57 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
@@ -23,11 +23,11 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskRecallAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
import java.io.Closeable;
@@ -42,19 +42,19 @@ public class WorkerRpcServer implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRpcServer.class);
@Autowired
- private TaskExecuteProcessor taskExecuteProcessor;
+ private TaskDispatchProcessor taskDispatchProcessor;
@Autowired
private TaskKillProcessor taskKillProcessor;
@Autowired
- private TaskRecallAckProcessor taskRecallAckProcessor;
+ private TaskRejectAckProcessor taskRejectAckProcessor;
@Autowired
private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
@Autowired
- private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
+ private TaskExecuteResultAckProcessor taskExecuteResultAckProcessor;
@Autowired
private HostUpdateProcessor hostUpdateProcessor;
@@ -72,12 +72,12 @@ public class WorkerRpcServer implements Closeable {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_DISPATCH_REQUEST, taskDispatchProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL_ACK, taskRecallAckProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
deleted file mode 100644
index 73b8d84cf7..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
-import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Retry Report Task Status Thread
- */
-@Component
-public class RetryReportTaskStatusThread extends BaseDaemonThread {
-
- private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
-
- /**
- * every 5 minutes
- */
- private static long RETRY_REPORT_TASK_STATUS_INTERVAL = 5 * 60 * 1000L;
-
- @Autowired
- private TaskCallbackService taskCallbackService;
-
- protected RetryReportTaskStatusThread() {
- super("RetryReportTaskStatusThread");
- }
-
- @Override
- public synchronized void start() {
- logger.info("Retry report task status thread starting");
- super.start();
- logger.info("Retry report task status thread started");
- }
-
- /**
- * retry ack/response
- */
- @Override
- public void run() {
- final ResponseCache instance = ResponseCache.get();
-
- while (Stopper.isRunning()) {
-
- // sleep 5 minutes
- ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
-
- try {
- // todo: Only retry the send failed command
- retryRunningCommand(instance);
- retryResponseCommand(instance);
- retryRecallCommand(instance);
- } catch (Exception e) {
- logger.warn("Retry report task status error", e);
- }
- }
- }
-
- private void retryRunningCommand(ResponseCache instance) {
- if (!instance.getRunningCache().isEmpty()) {
- Map<Integer, Command> runningCache = instance.getRunningCache();
- logger.info("Send task running retry command starting, waiting to retry size: {}", runningCache.size());
- for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
- Integer taskInstanceId = entry.getKey();
- Command runningCommand = entry.getValue();
- try {
- taskCallbackService.send(taskInstanceId, runningCommand);
- } catch (Exception ex) {
- logger.error("Retry send running command to master error, taskInstanceId: {}, command: {}", taskInstanceId, runningCommand);
- }
- }
- logger.info("Send task running retry command finished, waiting to retry size: {}", runningCache.size());
- }
- }
-
- private void retryResponseCommand(ResponseCache instance) {
- Map<Integer, Command> responseCache = instance.getResponseCache();
- if (!responseCache.isEmpty()) {
- logger.info("Send task response retry command starting, waiting to retry size: {}", responseCache.size());
- for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
- Integer taskInstanceId = entry.getKey();
- Command responseCommand = entry.getValue();
- try {
- taskCallbackService.send(taskInstanceId, responseCommand);
- } catch (Exception ex) {
- logger.error("Retry send response command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
- }
- }
- logger.info("Send task response retry command finished, waiting to retry size: {}", responseCache.size());
- }
- }
-
- private void retryRecallCommand(ResponseCache instance) {
- Map<Integer, Command> recallCache = instance.getRecallCache();
- if (!recallCache.isEmpty()) {
- logger.info("Send task recall retry command starting, waiting to retry size: {}", recallCache.size());
- for (Map.Entry<Integer, Command> entry : recallCache.entrySet()) {
- Integer taskInstanceId = entry.getKey();
- Command responseCommand = entry.getValue();
- try {
- taskCallbackService.send(taskInstanceId, responseCommand);
- } catch (Exception ex) {
- logger.error("Retry send recall command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
- }
- }
- logger.info("Send task recall retry command finished, waiting to retry size: {}", recallCache.size());
- }
- }
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 7e03f83844..9f2ad933ad 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -33,11 +33,11 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
+import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
@@ -53,18 +53,18 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
+import lombok.NonNull;
+
/**
* task scheduler thread
*/
@@ -78,9 +78,11 @@ public class TaskExecuteThread implements Runnable, Delayed {
/**
* task instance
*/
- private TaskExecutionContext taskExecutionContext;
+ private final TaskExecutionContext taskExecutionContext;
+
+ private final String masterAddress;
- private StorageOperate storageOperate;
+ private final StorageOperate storageOperate;
/**
* abstract task
@@ -90,12 +92,12 @@ public class TaskExecuteThread implements Runnable, Delayed {
/**
* task callback service
*/
- private TaskCallbackService taskCallbackService;
+ private final WorkerMessageSender workerMessageSender;
/**
* alert client server
*/
- private AlertClientService alertClientService;
+ private final AlertClientService alertClientService;
private TaskPluginManager taskPluginManager;
@@ -103,25 +105,29 @@ public class TaskExecuteThread implements Runnable, Delayed {
* constructor
*
* @param taskExecutionContext taskExecutionContext
- * @param taskCallbackService taskCallbackService
+ * @param workerMessageSender used for worker send message to master
*/
- public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
- TaskCallbackService taskCallbackService,
- AlertClientService alertClientService,
+ public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
+ @NonNull String masterAddress,
+ @NonNull WorkerMessageSender workerMessageSender,
+ @NonNull AlertClientService alertClientService,
StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
- this.taskCallbackService = taskCallbackService;
+ this.masterAddress = masterAddress;
+ this.workerMessageSender = workerMessageSender;
this.alertClientService = alertClientService;
this.storageOperate = storageOperate;
}
- public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
- TaskCallbackService taskCallbackService,
- AlertClientService alertClientService,
- TaskPluginManager taskPluginManager,
+ public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
+ @NonNull String masterAddress,
+ @NonNull WorkerMessageSender workerMessageSender,
+ @NonNull AlertClientService alertClientService,
+ @NonNull TaskPluginManager taskPluginManager,
StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
- this.taskCallbackService = taskCallbackService;
+ this.masterAddress = masterAddress;
+ this.workerMessageSender = workerMessageSender;
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
@@ -129,18 +135,26 @@ public class TaskExecuteThread implements Runnable, Delayed {
@Override
public void run() {
- if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
- taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
- taskExecutionContext.setStartTime(new Date());
- taskExecutionContext.setEndTime(new Date());
- TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
- logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success",
- taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
- return;
+ try {
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId());
+ if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
+ taskExecutionContext.setStartTime(new Date());
+ taskExecutionContext.setEndTime(new Date());
+ TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ workerMessageSender.sendMessageWithRetry(taskExecutionContext,
+ masterAddress,
+ CommandType.TASK_EXECUTE_RESULT);
+ logger.info("Task dry run success");
+ return;
+ }
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
try {
- LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
+ LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId());
logger.info("script path : {}", taskExecutionContext.getExecutePath());
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
@@ -149,10 +163,13 @@ public class TaskExecuteThread implements Runnable, Delayed {
// callback task execute running
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
- taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
+ workerMessageSender.sendMessageWithRetry(taskExecutionContext,
+ masterAddress,
+ CommandType.TASK_EXECUTE_RUNNING);
// copy hdfs/minio file to local
- List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
+ List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(),
+ taskExecutionContext.getResources());
if (!fileDownloads.isEmpty()) {
downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
}
@@ -160,8 +177,8 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setTaskAppId(String.format("%s_%s",
- taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId()));
+ taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId()));
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
if (null == taskChannel) {
@@ -208,7 +225,9 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskExecutionContext.setAppIds(this.task.getAppIds());
} finally {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+ workerMessageSender.sendMessageWithRetry(taskExecutionContext,
+ masterAddress,
+ CommandType.TASK_EXECUTE_RESULT);
clearTaskExecPath();
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
index 67abf523ed..98de315345 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
@@ -68,8 +68,10 @@ public class WorkerExecService {
@Override
public void onFailure(Throwable throwable) {
- logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", taskExecuteThread.getTaskExecutionContext().getProcessInstanceId()
- , taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), throwable);
+ logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}",
+ taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
+ taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
+ throwable);
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
}
};
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 1a541666fd..8b6365172a 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -21,12 +21,9 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
-import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -62,7 +59,7 @@ public class WorkerManagerThread implements Runnable {
* task callback service
*/
@Autowired
- private TaskCallbackService taskCallbackService;
+ private WorkerMessageSender workerMessageSender;
private volatile int workerExecThreads;
@@ -110,20 +107,8 @@ public class WorkerManagerThread implements Runnable {
waitSubmitQueue.stream()
.filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId)
.forEach(waitSubmitQueue::remove);
- sendTaskKillResponse(taskInstanceId);
}
- /**
- * kill task before execute , like delay task
- */
- private void sendTaskKillResponse(Integer taskInstanceId) {
- TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
- if (taskExecutionContext == null) {
- return;
- }
- taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.KILL);
- taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
- }
/**
* submit task
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
similarity index 61%
rename from dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
rename to dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
index 2187a19281..08db9060a6 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
@@ -25,11 +25,12 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
@@ -48,19 +49,20 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* test task execute processor
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
- JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
+@PrepareForTest({SpringApplicationContext.class, WorkerConfig.class, FileUtils.class, JsonSerializer.class,
+ JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
@Ignore
-public class TaskExecuteProcessorTest {
+public class TaskDispatchProcessorTest {
private TaskExecutionContext taskExecutionContext;
- private TaskCallbackService taskCallbackService;
+ private WorkerMessageSender workerMessageSender;
private ExecutorService workerExecService;
@@ -72,7 +74,7 @@ public class TaskExecuteProcessorTest {
private Command ackCommand;
- private TaskExecuteRequestCommand taskRequestCommand;
+ private TaskDispatchCommand taskRequestCommand;
private AlertClientService alertClientService;
@@ -86,66 +88,73 @@ public class TaskExecuteProcessorTest {
workerConfig.setExecThreads(1);
workerConfig.setListenPort(1234);
command = new Command();
- command.setType(CommandType.TASK_EXECUTE_REQUEST);
- ackCommand = new TaskExecuteRunningCommand().convert2Command();
- taskRequestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
+ command.setType(CommandType.TASK_DISPATCH_REQUEST);
+ ackCommand = new TaskExecuteRunningCommand("127.0.0.1:1234",
+ "127.0.0.1:5678",
+ System.currentTimeMillis()).convert2Command();
+ taskRequestCommand = new TaskDispatchCommand(taskExecutionContext,
+ "127.0.0.1:5678",
+ "127.0.0.1:1234",
+ System.currentTimeMillis());
alertClientService = PowerMockito.mock(AlertClientService.class);
workerExecService = PowerMockito.mock(ExecutorService.class);
- PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class)))
- .thenReturn(null);
+ PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class))).thenReturn(null);
PowerMockito.mockStatic(ChannelUtils.class);
PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
- taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
- PowerMockito.doNothing().when(taskCallbackService).send(taskExecutionContext.getTaskInstanceId(), ackCommand);
+ workerMessageSender = PowerMockito.mock(WorkerMessageSender.class);
PowerMockito.mockStatic(SpringApplicationContext.class);
- PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
- .thenReturn(taskCallbackService);
- PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
- .thenReturn(workerConfig);
+ PowerMockito.when(SpringApplicationContext.getBean(WorkerMessageSender.class)).thenReturn(workerMessageSender);
+ PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig);
workerManager = PowerMockito.mock(WorkerManagerThread.class);
storageOperate = PowerMockito.mock(StorageOperate.class);
- PowerMockito.when(
- workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, storageOperate)))
- .thenReturn(Boolean.TRUE);
+ PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext,
+ "127.0.0.1:5678",
+ workerMessageSender,
+ alertClientService,
+ storageOperate))).thenReturn(Boolean.TRUE);
- PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class))
- .thenReturn(workerManager);
+ PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)).thenReturn(workerManager);
PowerMockito.mockStatic(ThreadUtils.class);
- PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()))
- .thenReturn(workerExecService);
+ PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread",
+ workerConfig.getExecThreads())).thenReturn(
+ workerExecService);
PowerMockito.mockStatic(JsonSerializer.class);
- PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class))
- .thenReturn(taskRequestCommand);
+ PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskDispatchCommand.class)).thenReturn(
+ taskRequestCommand);
PowerMockito.mockStatic(JSONUtils.class);
- PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class))
- .thenReturn(taskRequestCommand);
+ PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class)).thenReturn(
+ taskRequestCommand);
PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
- taskExecutionContext.getProcessDefineCode(),
- taskExecutionContext.getProcessDefineVersion(),
- taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId()))
- .thenReturn(taskExecutionContext.getExecutePath());
+ taskExecutionContext.getProcessDefineCode(),
+ taskExecutionContext.getProcessDefineVersion(),
+ taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId())).thenReturn(
+ taskExecutionContext.getExecutePath());
PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
- SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(
- null, null, null, alertClientService, storageOperate);
- PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments()
- .thenReturn(simpleTaskExecuteThread);
+ SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(new TaskExecutionContext(),
+ workerMessageSender,
+ "127.0.0.1:5678",
+ LoggerFactory.getLogger(
+ TaskDispatchProcessorTest.class),
+ alertClientService,
+ storageOperate);
+ PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments().thenReturn(simpleTaskExecuteThread);
}
@Test
public void testNormalExecution() {
- TaskExecuteProcessor processor = new TaskExecuteProcessor();
+ TaskDispatchProcessor processor = new TaskDispatchProcessor();
processor.process(null, command);
Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
@@ -154,7 +163,7 @@ public class TaskExecuteProcessorTest {
@Test
public void testDelayExecution() {
taskExecutionContext.setDelayTime(1);
- TaskExecuteProcessor processor = new TaskExecuteProcessor();
+ TaskDispatchProcessor processor = new TaskDispatchProcessor();
processor.process(null, command);
Assert.assertEquals(ExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
@@ -179,11 +188,12 @@ public class TaskExecuteProcessorTest {
private static class SimpleTaskExecuteThread extends TaskExecuteThread {
public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext,
- TaskCallbackService taskCallbackService,
+ WorkerMessageSender workerMessageSender,
+ String masterAddress,
Logger taskLogger,
AlertClientService alertClientService,
StorageOperate storageOperate) {
- super(taskExecutionContext, taskCallbackService, alertClientService, storageOperate);
+ super(taskExecutionContext, masterAddress, workerMessageSender, alertClientService, storageOperate);
}
@Override
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
index 90577111b9..bd492c7ff6 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
@@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest;
+import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
@@ -48,7 +48,7 @@ public class TaskExecuteThreadTest {
private TaskExecutionContext taskExecutionContext;
@Mock
- private TaskCallbackService taskCallbackService;
+ private WorkerMessageSender workerMessageSender;
@Mock
private AlertClientService alertClientService;
@@ -61,8 +61,12 @@ public class TaskExecuteThreadTest {
@Test
public void checkTest() {
- TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService,
- alertClientService, taskPluginManager, storageOperate);
+ TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext,
+ "127.0.0.1:5678",
+ workerMessageSender,
+ alertClientService,
+ taskPluginManager,
+ storageOperate);
String path = "/";
Map<String, String> projectRes = new HashMap<>();