You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/03/30 09:35:11 UTC
[dolphinscheduler] branch dev updated: [Improvement-7697][Master/Worker] Change the task ack to runnning callback (#8719)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ddf1ff9 [Improvement-7697][Master/Worker] Change the task ack to runnning callback (#8719)
ddf1ff9 is described below
commit ddf1ff98fa94e67ca29d24189d442fee8fbe3cda
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Wed Mar 30 17:35:05 2022 +0800
[Improvement-7697][Master/Worker] Change the task ack to runnning callback (#8719)
* rebase dev
* change task state to dispatch if dispatch success
* update task host when dispatch
* add dispatch task event
* test
* check tenant after enable auto create
* handle dispatch state
Co-authored-by: caishunfeng <53...@qq.com>
---
.../apache/dolphinscheduler/common/Constants.java | 49 +++---
.../dolphinscheduler/common/enums/Event.java | 7 +-
.../common/enums/TaskStateType.java | 1 +
.../server/master/MasterServer.java | 12 +-
.../master/consumer/TaskPriorityQueueConsumer.java | 26 +++
.../dispatch/executor/NettyExecutorManager.java | 23 +--
...ssor.java => TaskExecuteResponseProcessor.java} | 30 ++--
...essor.java => TaskExecuteRunningProcessor.java} | 39 ++---
.../{TaskResponseEvent.java => TaskEvent.java} | 74 +++++----
...kResponseService.java => TaskEventService.java} | 150 ++++++++++-------
.../master/runner/task/BaseTaskProcessor.java | 7 +-
.../server/utils/DataQualityResultOperator.java | 6 +-
.../server/utils/DependentExecute.java | 1 +
.../master/processor/TaskAckProcessorTest.java | 91 +++++++++++
.../processor/queue/TaskResponseServiceTest.java | 68 +++++---
.../remote/command/CommandType.java | 16 +-
.../remote/command/StateEventResponseCommand.java | 2 +-
...and.java => TaskExecuteResponseAckCommand.java} | 13 +-
.../remote/command/TaskExecuteResponseCommand.java | 71 +++++++-
...mand.java => TaskExecuteRunningAckCommand.java} | 13 +-
...Command.java => TaskExecuteRunningCommand.java} | 55 +++++--
.../service/process/ProcessService.java | 182 ++++++++-------------
.../plugin/task/api/TaskExecutionContext.java | 63 ++++++-
.../plugin/task/api/enums/ExecutionStatus.java | 4 +-
.../server/worker/WorkerServer.java | 12 +-
.../server/worker/cache/ResponseCache.java | 34 ++--
.../worker/processor/TaskCallbackService.java | 131 +++++++++++++--
.../worker/processor/TaskExecuteProcessor.java | 70 +++-----
...r.java => TaskExecuteResponseAckProcessor.java} | 28 ++--
...or.java => TaskExecuteRunningAckProcessor.java} | 24 +--
.../server/worker/processor/TaskKillProcessor.java | 16 +-
.../worker/runner/RetryReportTaskStatusThread.java | 16 +-
.../server/worker/runner/TaskExecuteThread.java | 106 +++---------
.../server/worker/runner/WorkerManagerThread.java | 2 +-
.../worker/processor/TaskExecuteProcessorTest.java | 16 +-
35 files changed, 873 insertions(+), 585 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index e149c2e..0572e8f 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -17,9 +17,10 @@
package org.apache.dolphinscheduler.common;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import java.util.regex.Pattern;
@@ -48,20 +49,20 @@ public final class Constants {
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters";
- public static final String FORMAT_SS ="%s%s";
- public static final String FORMAT_S_S ="%s/%s";
- public static final String AWS_ACCESS_KEY_ID="aws.access.key.id";
- public static final String AWS_SECRET_ACCESS_KEY="aws.secret.access.key";
- public static final String AWS_REGION="aws.region";
- public static final String FOLDER_SEPARATOR ="/";
+ public static final String FORMAT_SS = "%s%s";
+ public static final String FORMAT_S_S = "%s/%s";
+ public static final String AWS_ACCESS_KEY_ID = "aws.access.key.id";
+ public static final String AWS_SECRET_ACCESS_KEY = "aws.secret.access.key";
+ public static final String AWS_REGION = "aws.region";
+ public static final String FOLDER_SEPARATOR = "/";
public static final String RESOURCE_TYPE_FILE = "resources";
- public static final String RESOURCE_TYPE_UDF="udfs";
+ public static final String RESOURCE_TYPE_UDF = "udfs";
- public static final String STORAGE_S3="S3";
+ public static final String STORAGE_S3 = "S3";
- public static final String STORAGE_HDFS="HDFS";
+ public static final String STORAGE_HDFS = "HDFS";
public static final String BUCKET_NAME = "dolphinscheduler-test";
@@ -71,7 +72,6 @@ public final class Constants {
public static final String FS_DEFAULT_FS = "fs.defaultFS";
-
/**
* hadoop configuration
*/
@@ -254,7 +254,7 @@ public final class Constants {
* user name regex
*/
public static final Pattern REGEX_USER_NAME = Pattern.compile("^[a-zA-Z0-9._-]{3,39}$");
-
+
/**
* read permission
*/
@@ -424,7 +424,7 @@ public final class Constants {
/**
* process or task definition first version
*/
- public static final int VERSION_FIRST = 1;
+ public static final int VERSION_FIRST = 1;
/**
* date format of yyyyMMdd
@@ -584,7 +584,6 @@ public final class Constants {
public static final long DEPENDENT_ALL_TASK_CODE = 0;
-
/**
* preview schedule execute count
*/
@@ -640,20 +639,22 @@ public final class Constants {
*/
public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s";
- public static final int[] NOT_TERMINATED_STATES = new int[] {
- ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
- ExecutionStatus.RUNNING_EXECUTION.ordinal(),
- ExecutionStatus.DELAY_EXECUTION.ordinal(),
- ExecutionStatus.READY_PAUSE.ordinal(),
- ExecutionStatus.READY_STOP.ordinal(),
- ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
- ExecutionStatus.WAITING_THREAD.ordinal(),
- ExecutionStatus.WAITING_DEPEND.ordinal()
+ public static final int[] NOT_TERMINATED_STATES = new int[]{
+ ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+ ExecutionStatus.DISPATCH.ordinal(),
+ ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+ ExecutionStatus.DELAY_EXECUTION.ordinal(),
+ ExecutionStatus.READY_PAUSE.ordinal(),
+ ExecutionStatus.READY_STOP.ordinal(),
+ ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
+ ExecutionStatus.WAITING_THREAD.ordinal(),
+ ExecutionStatus.WAITING_DEPEND.ordinal()
};
- public static final int[] RUNNING_PROCESS_STATE = new int[] {
+ public static final int[] RUNNING_PROCESS_STATE = new int[]{
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+ ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.SERIAL_WAIT.ordinal()
};
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
index 9cec276..78b036f 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
@@ -18,6 +18,9 @@
package org.apache.dolphinscheduler.common.enums;
public enum Event {
- ACK,
- RESULT;
+ DISPATCH,
+ DELAY,
+ RUNNING,
+ RESULT,
+ ;
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java
index f544b41..482cc65 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java
@@ -54,6 +54,7 @@ public enum TaskStateType {
};
case RUNNING:
return new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+ ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 22b4a69..af050ae 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -27,10 +27,10 @@ import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
@@ -75,10 +75,10 @@ public class MasterServer implements IStoppable {
private Scheduler scheduler;
@Autowired
- private TaskAckProcessor taskAckProcessor;
+ private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
@Autowired
- private TaskResponseProcessor taskResponseProcessor;
+ private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
@Autowired
private TaskEventProcessor taskEventProcessor;
@@ -115,8 +115,8 @@ public class MasterServer implements IStoppable {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index d7d5494..06bbc30 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -24,11 +24,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
@@ -79,6 +82,11 @@ public class TaskPriorityQueueConsumer extends Thread {
@Autowired
private ExecutorDispatcher dispatcher;
+ /**
+ * processInstance cache manager
+ */
+ @Autowired
+ private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
/**
* master config
@@ -87,6 +95,12 @@ public class TaskPriorityQueueConsumer extends Thread {
private MasterConfig masterConfig;
/**
+ * task response service
+ */
+ @Autowired
+ private TaskEventService taskEventService;
+
+ /**
* consumer thread pool
*/
private ThreadPoolExecutor consumerThreadPoolExecutor;
@@ -168,12 +182,24 @@ public class TaskPriorityQueueConsumer extends Thread {
}
result = dispatcher.dispatch(executionContext);
+
+ if (result) {
+ addDispatchEvent(context, executionContext);
+ }
} catch (RuntimeException | ExecuteException e) {
logger.error("dispatch error: {}", e.getMessage(), e);
}
return result;
}
+ /**
+ * add dispatch event
+ */
+ private void addDispatchEvent(TaskExecutionContext context, ExecutionContext executionContext) {
+ TaskEvent taskEvent = TaskEvent.newDispatchEvent(context.getProcessInstanceId(), context.getTaskInstanceId(), executionContext.getHost().getAddress());
+ taskEventService.addEvent(taskEvent);
+ }
+
private Command toCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext));
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 0e5333b..0ba24e2 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -26,9 +26,9 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
-import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
-import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.commons.collections.CollectionUtils;
@@ -46,7 +46,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
- * netty executor manager
+ * netty executor manager
*/
@Service
public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
@@ -60,13 +60,13 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
private ServerNodeManager serverNodeManager;
@Autowired
- private TaskAckProcessor taskAckProcessor;
+ private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
@Autowired
private TaskKillResponseProcessor taskKillResponseProcessor;
@Autowired
- private TaskResponseProcessor taskResponseProcessor;
+ private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
/**
* netty remote client
@@ -83,13 +83,14 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
@PostConstruct
public void init() {
- this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
- this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
+ this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
+ this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
}
/**
* execute logic
+ *
* @param context context
* @return result
* @throws ExecuteException if error throws ExecuteException
@@ -119,7 +120,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
boolean success = false;
while (!success) {
try {
- doExecute(host,command);
+ doExecute(host, command);
success = true;
context.setHost(host);
} catch (ExecuteException ex) {
@@ -150,7 +151,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
}
/**
- * execute logic
+ * execute logic
+ *
* @param host host
* @param command command
* @throws ExecuteException if error throws ExecuteException
@@ -178,7 +180,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
}
/**
- * get all nodes
+ * get all nodes
+ *
* @param context context
* @return nodes
*/
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
similarity index 64%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
index 47bdfca..264e42e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
@@ -18,13 +18,12 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
-import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,15 +35,15 @@ import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
- * task response processor
+ * task execute response processor
*/
@Component
-public class TaskResponseProcessor implements NettyRequestProcessor {
+public class TaskExecuteResponseProcessor implements NettyRequestProcessor {
- private final Logger logger = LoggerFactory.getLogger(TaskResponseProcessor.class);
+ private final Logger logger = LoggerFactory.getLogger(TaskExecuteResponseProcessor.class);
@Autowired
- private TaskResponseService taskResponseService;
+ private TaskEventService taskEventService;
/**
* task final result response
@@ -57,19 +56,10 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
- TaskExecuteResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
- logger.info("received command : {}", responseCommand);
+ TaskExecuteResponseCommand taskExecuteResponseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
+ logger.info("received command : {}", taskExecuteResponseCommand);
- // TaskResponseEvent
- TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
- responseCommand.getEndTime(),
- responseCommand.getProcessId(),
- responseCommand.getAppIds(),
- responseCommand.getTaskInstanceId(),
- responseCommand.getVarPool(),
- channel,
- responseCommand.getProcessInstanceId()
- );
- taskResponseService.addResponse(taskResponseEvent);
+ TaskEvent taskResponseEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
+ taskEventService.addEvent(taskResponseEvent);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
similarity index 54%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
index 1d188a1..c5ddc3d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java
@@ -18,14 +18,12 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
-import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
-import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,15 +35,15 @@ import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
- * task ack processor
+ * task execute running processor
*/
@Component
-public class TaskAckProcessor implements NettyRequestProcessor {
+public class TaskExecuteRunningProcessor implements NettyRequestProcessor {
- private final Logger logger = LoggerFactory.getLogger(TaskAckProcessor.class);
+ private final Logger logger = LoggerFactory.getLogger(TaskExecuteRunningProcessor.class);
@Autowired
- private TaskResponseService taskResponseService;
+ private TaskEventService taskEventService;
/**
* task ack process
@@ -55,25 +53,12 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
- TaskExecuteAckCommand taskAckCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteAckCommand.class);
- logger.info("taskAckCommand : {}", taskAckCommand);
+ Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == command.getType(), String.format("invalid command type : %s", command.getType()));
+ TaskExecuteRunningCommand taskExecuteRunningCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
+ logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningCommand);
- String workerAddress = ChannelUtils.toAddress(channel).getAddress();
-
- ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus());
-
- // TaskResponseEvent
- TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ackStatus,
- taskAckCommand.getStartTime(),
- workerAddress,
- taskAckCommand.getExecutePath(),
- taskAckCommand.getLogPath(),
- taskAckCommand.getTaskInstanceId(),
- channel,
- taskAckCommand.getProcessInstanceId());
-
- taskResponseService.addResponse(taskResponseEvent);
+ TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
+ taskEventService.addEvent(taskEvent);
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
similarity index 69%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 2a6bd07..865eee5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -19,6 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import java.util.Date;
@@ -27,7 +30,7 @@ import io.netty.channel.Channel;
/**
* task event
*/
-public class TaskResponseEvent {
+public class TaskEvent {
/**
* taskInstanceId
@@ -90,46 +93,45 @@ public class TaskResponseEvent {
private Channel channel;
private int processInstanceId;
-
- public static TaskResponseEvent newAck(ExecutionStatus state,
- Date startTime,
- String workerAddress,
- String executePath,
- String logPath,
- int taskInstanceId,
- Channel channel,
- int processInstanceId) {
- TaskResponseEvent event = new TaskResponseEvent();
- event.setState(state);
- event.setStartTime(startTime);
- event.setWorkerAddress(workerAddress);
- event.setExecutePath(executePath);
- event.setLogPath(logPath);
+
+ public static TaskEvent newDispatchEvent(int processInstanceId, int taskInstanceId, String workerAddress) {
+ TaskEvent event = new TaskEvent();
+ event.setProcessInstanceId(processInstanceId);
event.setTaskInstanceId(taskInstanceId);
- event.setEvent(Event.ACK);
+ event.setWorkerAddress(workerAddress);
+ event.setEvent(Event.DISPATCH);
+ return event;
+ }
+
+ public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel) {
+ TaskEvent event = new TaskEvent();
+ event.setProcessInstanceId(command.getProcessInstanceId());
+ event.setTaskInstanceId(command.getTaskInstanceId());
+ event.setState(ExecutionStatus.of(command.getStatus()));
+ event.setStartTime(command.getStartTime());
+ event.setExecutePath(command.getExecutePath());
+ event.setLogPath(command.getLogPath());
event.setChannel(channel);
- event.setProcessInstanceId(processInstanceId);
+ event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
+ event.setEvent(Event.RUNNING);
return event;
}
- public static TaskResponseEvent newResult(ExecutionStatus state,
- Date endTime,
- int processId,
- String appIds,
- int taskInstanceId,
- String varPool,
- Channel channel,
- int processInstanceId) {
- TaskResponseEvent event = new TaskResponseEvent();
- event.setState(state);
- event.setEndTime(endTime);
- event.setProcessId(processId);
- event.setAppIds(appIds);
- event.setTaskInstanceId(taskInstanceId);
- event.setEvent(Event.RESULT);
- event.setVarPool(varPool);
+ public static TaskEvent newResultEvent(TaskExecuteResponseCommand command, Channel channel) {
+ TaskEvent event = new TaskEvent();
+ event.setProcessInstanceId(command.getProcessInstanceId());
+ event.setTaskInstanceId(command.getTaskInstanceId());
+ event.setState(ExecutionStatus.of(command.getStatus()));
+ event.setStartTime(command.getStartTime());
+ event.setExecutePath(command.getExecutePath());
+ event.setLogPath(command.getLogPath());
+ event.setEndTime(command.getEndTime());
+ event.setProcessId(command.getProcessId());
+ event.setAppIds(command.getAppIds());
+ event.setVarPool(command.getVarPool());
event.setChannel(channel);
- event.setProcessInstanceId(processInstanceId);
+ event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
+ event.setEvent(Event.RESULT);
return event;
}
@@ -140,7 +142,7 @@ public class TaskResponseEvent {
public void setVarPool(String varPool) {
this.varPool = varPool;
}
-
+
public int getTaskInstanceId() {
return taskInstanceId;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
similarity index 50%
rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
index a0de0af..4984390 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
@@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
@@ -50,17 +50,17 @@ import io.netty.channel.Channel;
* task manager
*/
@Component
-public class TaskResponseService {
+public class TaskEventService {
/**
* logger
*/
- private final Logger logger = LoggerFactory.getLogger(TaskResponseService.class);
+ private final Logger logger = LoggerFactory.getLogger(TaskEventService.class);
/**
* attemptQueue
*/
- private final BlockingQueue<TaskResponseEvent> eventQueue = new LinkedBlockingQueue<>();
+ private final BlockingQueue<TaskEvent> eventQueue = new LinkedBlockingQueue<>();
/**
* process service
@@ -75,9 +75,9 @@ public class TaskResponseService {
private DataQualityResultOperator dataQualityResultOperator;
/**
- * task response worker
+ * task event worker
*/
- private Thread taskResponseWorker;
+ private Thread taskEventWorker;
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@@ -87,19 +87,19 @@ public class TaskResponseService {
@PostConstruct
public void start() {
- this.taskResponseWorker = new TaskResponseWorker();
- this.taskResponseWorker.setName("StateEventResponseWorker");
- this.taskResponseWorker.start();
+ this.taskEventWorker = new TaskEventWorker();
+ this.taskEventWorker.setName("TaskStateEventWorker");
+ this.taskEventWorker.start();
}
@PreDestroy
public void stop() {
try {
- this.taskResponseWorker.interrupt();
+ this.taskEventWorker.interrupt();
if (!eventQueue.isEmpty()) {
- List<TaskResponseEvent> remainEvents = new ArrayList<>(eventQueue.size());
+ List<TaskEvent> remainEvents = new ArrayList<>(eventQueue.size());
eventQueue.drainTo(remainEvents);
- for (TaskResponseEvent event : remainEvents) {
+ for (TaskEvent event : remainEvents) {
this.persist(event);
}
}
@@ -109,15 +109,15 @@ public class TaskResponseService {
}
/**
- * put task to attemptQueue
+ * add event to queue
*
- * @param taskResponseEvent taskResponseEvent
+ * @param taskEvent taskEvent
*/
- public void addResponse(TaskResponseEvent taskResponseEvent) {
+ public void addEvent(TaskEvent taskEvent) {
try {
- eventQueue.put(taskResponseEvent);
+ eventQueue.put(taskEvent);
} catch (InterruptedException e) {
- logger.error("put task : {} error :{}", taskResponseEvent, e);
+ logger.error("add task event : {} error :{}", taskEvent, e);
Thread.currentThread().interrupt();
}
}
@@ -125,7 +125,7 @@ public class TaskResponseService {
/**
* task worker thread
*/
- class TaskResponseWorker extends Thread {
+ class TaskEventWorker extends Thread {
@Override
public void run() {
@@ -133,8 +133,8 @@ public class TaskResponseService {
while (Stopper.isRunning()) {
try {
// if not task , blocking here
- TaskResponseEvent taskResponseEvent = eventQueue.take();
- persist(taskResponseEvent);
+ TaskEvent taskEvent = eventQueue.take();
+ persist(taskEvent);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
@@ -147,14 +147,14 @@ public class TaskResponseService {
}
/**
- * persist taskResponseEvent
+ * persist task event
*
- * @param taskResponseEvent taskResponseEvent
+ * @param taskEvent taskEvent
*/
- private void persist(TaskResponseEvent taskResponseEvent) {
- Event event = taskResponseEvent.getEvent();
- int taskInstanceId = taskResponseEvent.getTaskInstanceId();
- int processInstanceId = taskResponseEvent.getProcessInstanceId();
+ private void persist(TaskEvent taskEvent) {
+ Event event = taskEvent.getEvent();
+ int taskInstanceId = taskEvent.getTaskInstanceId();
+ int processInstanceId = taskEvent.getProcessInstanceId();
TaskInstance taskInstance;
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
@@ -165,75 +165,103 @@ public class TaskResponseService {
}
switch (event) {
- case ACK:
- handleAckEvent(taskResponseEvent, taskInstance);
+ case DISPATCH:
+ handleDispatchEvent(taskEvent, taskInstance);
+ // dispatch event do not need to submit state event
+ return;
+ case DELAY:
+ case RUNNING:
+ handleRunningEvent(taskEvent, taskInstance);
break;
case RESULT:
- handleResultEvent(taskResponseEvent, taskInstance);
+ handleResultEvent(taskEvent, taskInstance);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
StateEvent stateEvent = new StateEvent();
- stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
- stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
- stateEvent.setExecutionStatus(taskResponseEvent.getState());
+ stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
+ stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
+ stateEvent.setExecutionStatus(taskEvent.getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
/**
- * handle ack event
+ * handle dispatch event
*/
- private void handleAckEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
- Channel channel = taskResponseEvent.getChannel();
+ private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
+ if (taskInstance == null) {
+ logger.error("taskInstance is null");
+ return;
+ }
+ if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
+ return;
+ }
+ taskInstance.setState(ExecutionStatus.DISPATCH);
+ taskInstance.setHost(taskEvent.getWorkerAddress());
+ processService.saveTaskInstance(taskInstance);
+ }
+
+ /**
+ * handle running event
+ */
+ private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
+ Channel channel = taskEvent.getChannel();
try {
if (taskInstance != null) {
if (taskInstance.getState().typeIsFinished()) {
- logger.warn("task is finish, ack is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
+ logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
} else {
- processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
- taskResponseEvent.getStartTime(),
- taskResponseEvent.getWorkerAddress(),
- taskResponseEvent.getExecutePath(),
- taskResponseEvent.getLogPath()
- );
+ taskInstance.setState(taskEvent.getState());
+ taskInstance.setStartTime(taskEvent.getStartTime());
+ taskInstance.setHost(taskEvent.getWorkerAddress());
+ taskInstance.setLogPath(taskEvent.getLogPath());
+ taskInstance.setExecutePath(taskEvent.getExecutePath());
+ taskInstance.setPid(taskEvent.getProcessId());
+ taskInstance.setAppLink(taskEvent.getAppIds());
+ processService.saveTaskInstance(taskInstance);
}
}
// if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
- DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
- channel.writeAndFlush(taskAckCommand.convert2Command());
+ TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+ channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
} catch (Exception e) {
logger.error("worker ack master error", e);
- DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
- channel.writeAndFlush(taskAckCommand.convert2Command());
+ TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
+ channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
}
}
/**
* handle result event
*/
- private void handleResultEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
- Channel channel = taskResponseEvent.getChannel();
+ private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
+ Channel channel = taskEvent.getChannel();
try {
if (taskInstance != null) {
- dataQualityResultOperator.operateDqExecuteResult(taskResponseEvent, taskInstance);
-
- processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
- taskResponseEvent.getEndTime(),
- taskResponseEvent.getProcessId(),
- taskResponseEvent.getAppIds(),
- taskResponseEvent.getVarPool()
- );
+ dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
+
+ taskInstance.setStartTime(taskEvent.getStartTime());
+ taskInstance.setHost(taskEvent.getWorkerAddress());
+ taskInstance.setLogPath(taskEvent.getLogPath());
+ taskInstance.setExecutePath(taskEvent.getExecutePath());
+ taskInstance.setPid(taskEvent.getProcessId());
+ taskInstance.setAppLink(taskEvent.getAppIds());
+ taskInstance.setState(taskEvent.getState());
+ taskInstance.setEndTime(taskEvent.getEndTime());
+ taskInstance.setVarPool(taskEvent.getVarPool());
+ processService.changeOutParam(taskInstance);
+ processService.saveTaskInstance(taskInstance);
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
- DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
- channel.writeAndFlush(taskResponseCommand.convert2Command());
+ TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
+ channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
} catch (Exception e) {
logger.error("worker response master error", e);
- DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
- channel.writeAndFlush(taskResponseCommand.convert2Command());
+ TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
+ channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
}
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index c53a02e..38fef8f 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -259,11 +259,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
- processService.changeTaskState(taskInstance, ExecutionStatus.FAILURE,
- taskInstance.getStartTime(),
- taskInstance.getHost(),
- null,
- null);
+ taskInstance.setState(ExecutionStatus.FAILURE);
+ processService.saveTaskInstance(taskInstance);
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DataQualityResultOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DataQualityResultOperator.java
index 906c3a9..d172228 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DataQualityResultOperator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DataQualityResultOperator.java
@@ -27,7 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.CheckType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqFailureStrategy;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OperatorType;
-import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -59,7 +59,7 @@ public class DataQualityResultOperator {
* @param taskResponseEvent
* @param taskInstance
*/
- public void operateDqExecuteResult(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
+ public void operateDqExecuteResult(TaskEvent taskResponseEvent, TaskInstance taskInstance) {
if (TASK_TYPE_DATA_QUALITY.equals(taskInstance.getTaskType())) {
ProcessInstance processInstance =
@@ -92,7 +92,7 @@ public class DataQualityResultOperator {
* @param dqExecuteResult
* @param processInstance
*/
- private void checkDqExecuteResult(TaskResponseEvent taskResponseEvent,
+ private void checkDqExecuteResult(TaskEvent taskResponseEvent,
DqExecuteResult dqExecuteResult,
ProcessInstance processInstance) {
if (isFailure(dqExecuteResult)) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
index 7429dc2..1a744ea 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
@@ -231,6 +231,7 @@ public class DependentExecute {
if (state.typeIsRunning()
|| state == ExecutionStatus.SUBMITTED_SUCCESS
+ || state == ExecutionStatus.DISPATCH
|| state == ExecutionStatus.WAITING_THREAD) {
return DependResult.WAITING;
} else {
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
new file mode 100644
index 0000000..fc320ab
--- /dev/null
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Date;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import io.netty.channel.Channel;
+
+/**
+ * task ack processor test
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SpringApplicationContext.class, TaskEvent.class})
+public class TaskAckProcessorTest {
+
+ private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
+ private TaskEventService taskEventService;
+ private ProcessService processService;
+ private TaskExecuteRunningCommand taskExecuteRunningCommand;
+ private TaskEvent taskResponseEvent;
+ private Channel channel;
+
+ @Before
+ public void before() {
+ PowerMockito.mockStatic(SpringApplicationContext.class);
+
+ taskEventService = PowerMockito.mock(TaskEventService.class);
+ PowerMockito.when(SpringApplicationContext.getBean(TaskEventService.class)).thenReturn(taskEventService);
+
+ processService = PowerMockito.mock(ProcessService.class);
+ PowerMockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
+
+ taskExecuteRunningProcessor = new TaskExecuteRunningProcessor();
+
+ channel = PowerMockito.mock(Channel.class);
+ taskResponseEvent = PowerMockito.mock(TaskEvent.class);
+
+ taskExecuteRunningCommand = new TaskExecuteRunningCommand();
+ taskExecuteRunningCommand.setStatus(1);
+ taskExecuteRunningCommand.setExecutePath("/dolphinscheduler/worker");
+ taskExecuteRunningCommand.setHost("localhost");
+ taskExecuteRunningCommand.setLogPath("/temp/worker.log");
+ taskExecuteRunningCommand.setStartTime(new Date());
+ taskExecuteRunningCommand.setTaskInstanceId(1);
+ taskExecuteRunningCommand.setProcessInstanceId(1);
+ }
+
+ @Test
+ public void testProcess() {
+// Command command = taskExecuteAckCommand.convert2Command();
+// Assert.assertEquals(CommandType.TASK_EXECUTE_ACK,command.getType());
+// InetSocketAddress socketAddress = new InetSocketAddress("localhost",12345);
+// PowerMockito.when(channel.remoteAddress()).thenReturn(socketAddress);
+// PowerMockito.mockStatic(TaskResponseEvent.class);
+//
+// PowerMockito.when(TaskResponseEvent.newAck(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyInt(), channel))
+// .thenReturn(taskResponseEvent);
+// TaskInstance taskInstance = PowerMockito.mock(TaskInstance.class);
+// PowerMockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance);
+//
+// taskAckProcessor.process(channel,command);
+ }
+}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index 9794bdd..9a054b6 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -19,9 +19,14 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import java.net.InetSocketAddress;
import java.util.Date;
import org.junit.After;
@@ -42,41 +47,52 @@ public class TaskResponseServiceTest {
private ProcessService processService;
@InjectMocks
- TaskResponseService taskRspService;
+ TaskEventService taskEventService;
@Mock
private Channel channel;
- private TaskResponseEvent ackEvent;
+ private TaskEvent ackEvent;
- private TaskResponseEvent resultEvent;
+ private TaskEvent resultEvent;
private TaskInstance taskInstance;
@Mock
private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
+ @Mock
+ private DataQualityResultOperator dataQualityResultOperator;
+
+ @Mock
+ private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
@Before
public void before() {
- taskRspService.start();
-
- ackEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION,
- new Date(),
- "127.*.*.*",
- "path",
- "logPath",
- 22,
- channel,
- 1);
-
- resultEvent = TaskResponseEvent.newResult(ExecutionStatus.SUCCESS,
- new Date(),
- 1,
- "ids",
- 22,
- "varPol",
- channel,
- 1);
+ taskEventService.start();
+
+ Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1", 1234));
+
+ TaskExecuteRunningCommand taskExecuteRunningCommand = new TaskExecuteRunningCommand();
+ taskExecuteRunningCommand.setProcessId(1);
+ taskExecuteRunningCommand.setTaskInstanceId(22);
+ taskExecuteRunningCommand.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode());
+ taskExecuteRunningCommand.setExecutePath("path");
+ taskExecuteRunningCommand.setLogPath("logPath");
+ taskExecuteRunningCommand.setHost("127.*.*.*");
+ taskExecuteRunningCommand.setStartTime(new Date());
+
+ ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
+
+ TaskExecuteResponseCommand taskExecuteResponseCommand = new TaskExecuteResponseCommand();
+ taskExecuteResponseCommand.setProcessInstanceId(1);
+ taskExecuteResponseCommand.setTaskInstanceId(22);
+ taskExecuteResponseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
+ taskExecuteResponseCommand.setEndTime(new Date());
+ taskExecuteResponseCommand.setVarPool("varPol");
+ taskExecuteResponseCommand.setAppIds("ids");
+ taskExecuteResponseCommand.setProcessId(1);
+ resultEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
taskInstance = new TaskInstance();
taskInstance.setId(22);
@@ -87,14 +103,14 @@ public class TaskResponseServiceTest {
public void testAddResponse() {
Mockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance);
Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null);
- taskRspService.addResponse(ackEvent);
- taskRspService.addResponse(resultEvent);
+ taskEventService.addEvent(ackEvent);
+ taskEventService.addEvent(resultEvent);
}
@After
public void after() {
- if (taskRspService != null) {
- taskRspService.stop();
+ if (taskEventService != null) {
+ taskEventService.stop();
}
}
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index a502242..5718872 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -69,24 +69,24 @@ public enum CommandType {
TASK_EXECUTE_REQUEST,
/**
- * execute task ack
+ * task execute running, from worker to master
*/
- TASK_EXECUTE_ACK,
+ TASK_EXECUTE_RUNNING,
/**
- * execute task response
+ * task execute running ack, from master to worker
*/
- TASK_EXECUTE_RESPONSE,
+ TASK_EXECUTE_RUNNING_ACK,
/**
- * db task ack
+ * task execute response, from worker to master
*/
- DB_TASK_ACK,
+ TASK_EXECUTE_RESPONSE,
/**
- * db task response
+ * task execute response ack, from master to worker
*/
- DB_TASK_RESPONSE,
+ TASK_EXECUTE_RESPONSE_ACK,
/**
* kill task
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java
index fd9c428..41ee1ef 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventResponseCommand.java
@@ -61,7 +61,7 @@ public class StateEventResponseCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.DB_TASK_RESPONSE);
+ command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java
similarity index 83%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java
index 9bd86cb..f3df257 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseAckCommand.java
@@ -22,18 +22,19 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
/**
- * db task final result response command
+ * task execute response ack command
+ * from master to worker
*/
-public class DBTaskResponseCommand implements Serializable {
+public class TaskExecuteResponseAckCommand implements Serializable {
private int taskInstanceId;
private int status;
- public DBTaskResponseCommand() {
+ public TaskExecuteResponseAckCommand() {
super();
}
- public DBTaskResponseCommand(int status, int taskInstanceId) {
+ public TaskExecuteResponseAckCommand(int status, int taskInstanceId) {
this.status = status;
this.taskInstanceId = taskInstanceId;
}
@@ -61,7 +62,7 @@ public class DBTaskResponseCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.DB_TASK_RESPONSE);
+ command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
@@ -69,7 +70,7 @@ public class DBTaskResponseCommand implements Serializable {
@Override
public String toString() {
- return "DBTaskResponseCommand{"
+ return "TaskExecuteResponseAckCommand{"
+ "taskInstanceId=" + taskInstanceId
+ ", status=" + status
+ '}';
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
index eafb803..4574b88 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
import java.util.Date;
/**
- * execute task response command
+ * execute task response command
*/
public class TaskExecuteResponseCommand implements Serializable {
@@ -36,23 +36,43 @@ public class TaskExecuteResponseCommand implements Serializable {
}
/**
- * task instance id
+ * task instance id
*/
private int taskInstanceId;
/**
* process instance id
*/
- private int processInstanceId;
+ private int processInstanceId;
/**
- * status
+ * status
*/
private int status;
+ /**
+ * startTime
+ */
+ private Date startTime;
+
+ /**
+ * host
+ */
+ private String host;
+
+ /**
+ * logPath
+ */
+ private String logPath;
/**
- * end time
+ * executePath
+ */
+ private String executePath;
+
+
+ /**
+ * end time
*/
private Date endTime;
@@ -72,6 +92,38 @@ public class TaskExecuteResponseCommand implements Serializable {
*/
private String varPool;
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Date startTime) {
+ this.startTime = startTime;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getLogPath() {
+ return logPath;
+ }
+
+ public void setLogPath(String logPath) {
+ this.logPath = logPath;
+ }
+
+ public String getExecutePath() {
+ return executePath;
+ }
+
+ public void setExecutePath(String executePath) {
+ this.executePath = executePath;
+ }
+
public void setVarPool(String varPool) {
this.varPool = varPool;
}
@@ -79,7 +131,7 @@ public class TaskExecuteResponseCommand implements Serializable {
public String getVarPool() {
return varPool;
}
-
+
public int getTaskInstanceId() {
return taskInstanceId;
}
@@ -122,6 +174,7 @@ public class TaskExecuteResponseCommand implements Serializable {
/**
* package response command
+ *
* @return command
*/
public Command convert2Command() {
@@ -136,10 +189,16 @@ public class TaskExecuteResponseCommand implements Serializable {
public String toString() {
return "TaskExecuteResponseCommand{"
+ "taskInstanceId=" + taskInstanceId
+ + ", processInstanceId=" + processInstanceId
+ ", status=" + status
+ + ", startTime=" + startTime
+ ", endTime=" + endTime
+ + ", host=" + host
+ + ", logPath=" + logPath
+ + ", executePath=" + executePath
+ ", processId=" + processId
+ ", appIds='" + appIds + '\''
+ + ", varPool=" + varPool
+ '}';
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java
similarity index 80%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java
index 4797104..b0bb666 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/DBTaskAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningAckCommand.java
@@ -22,18 +22,19 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
/**
- * db task ack request command
+ * task execute running ack command
+ * from master to worker
*/
-public class DBTaskAckCommand implements Serializable {
+public class TaskExecuteRunningAckCommand implements Serializable {
private int taskInstanceId;
private int status;
- public DBTaskAckCommand() {
+ public TaskExecuteRunningAckCommand() {
super();
}
- public DBTaskAckCommand(int status, int taskInstanceId) {
+ public TaskExecuteRunningAckCommand(int status, int taskInstanceId) {
this.status = status;
this.taskInstanceId = taskInstanceId;
}
@@ -61,7 +62,7 @@ public class DBTaskAckCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.DB_TASK_ACK);
+ command.setType(CommandType.TASK_EXECUTE_RUNNING_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
@@ -69,6 +70,6 @@ public class DBTaskAckCommand implements Serializable {
@Override
public String toString() {
- return "DBTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
+ return "TaskExecuteRunningAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
}
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java
similarity index 81%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java
index a3dbed5..0a2eac2 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java
@@ -23,9 +23,10 @@ import java.io.Serializable;
import java.util.Date;
/**
- * execute task request command
+ * task execute running command
+ * from worker to master
*/
-public class TaskExecuteAckCommand implements Serializable {
+public class TaskExecuteRunningCommand implements Serializable {
/**
* taskInstanceId
@@ -62,6 +63,16 @@ public class TaskExecuteAckCommand implements Serializable {
*/
private String executePath;
+ /**
+ * processId
+ */
+ private int processId;
+
+ /**
+ * appIds
+ */
+ private String appIds;
+
public Date getStartTime() {
return startTime;
}
@@ -94,6 +105,14 @@ public class TaskExecuteAckCommand implements Serializable {
this.taskInstanceId = taskInstanceId;
}
+ public int getProcessInstanceId() {
+ return processInstanceId;
+ }
+
+ public void setProcessInstanceId(int processInstanceId) {
+ this.processInstanceId = processInstanceId;
+ }
+
public String getLogPath() {
return logPath;
}
@@ -110,6 +129,22 @@ public class TaskExecuteAckCommand implements Serializable {
this.executePath = executePath;
}
+ public int getProcessId() {
+ return processId;
+ }
+
+ public void setProcessId(int processId) {
+ this.processId = processId;
+ }
+
+ public String getAppIds() {
+ return appIds;
+ }
+
+ public void setAppIds(String appIds) {
+ this.appIds = appIds;
+ }
+
/**
* package request command
*
@@ -117,7 +152,7 @@ public class TaskExecuteAckCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
- command.setType(CommandType.TASK_EXECUTE_ACK);
+ command.setType(CommandType.TASK_EXECUTE_RUNNING);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
@@ -125,22 +160,16 @@ public class TaskExecuteAckCommand implements Serializable {
@Override
public String toString() {
- return "TaskExecuteAckCommand{"
+ return "TaskExecuteRunningCommand{"
+ "taskInstanceId=" + taskInstanceId
+ + ", processInstanceId='" + processInstanceId + '\''
+ ", startTime=" + startTime
+ ", host='" + host + '\''
+ ", status=" + status
+ ", logPath='" + logPath + '\''
+ ", executePath='" + executePath + '\''
- + ", processInstanceId='" + processInstanceId + '\''
+ + ", processId=" + processId + '\''
+ + ", appIds='" + appIds + '\''
+ '}';
}
-
- public int getProcessInstanceId() {
- return processInstanceId;
- }
-
- public void setProcessInstanceId(int processInstanceId) {
- this.processInstanceId = processInstanceId;
- }
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index bf377d8..2255c08 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -17,13 +17,19 @@
package org.apache.dolphinscheduler.service.process;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.math.NumberUtils;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
+
+import static java.util.stream.Collectors.toSet;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -124,11 +130,10 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
import java.util.ArrayList;
import java.util.Arrays;
@@ -143,17 +148,16 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
-import static java.util.stream.Collectors.toSet;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
/**
* process relative dao that some mappers in this.
@@ -164,6 +168,7 @@ public class ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+ ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
@@ -266,8 +271,8 @@ public class ProcessService {
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
- * @param logger logger
- * @param host host
+ * @param logger logger
+ * @param host host
* @param command found command
* @return process instance
*/
@@ -368,7 +373,7 @@ public class ProcessService {
/**
* set process waiting thread
*
- * @param command command
+ * @param command command
* @param processInstance processInstance
* @return process instance
*/
@@ -581,8 +586,6 @@ public class ProcessService {
/**
* recursive delete all task instance by process instance id
- *
- * @param processInstanceId
*/
public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) {
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
@@ -603,7 +606,7 @@ public class ProcessService {
* recursive query sub process definition id by parent id.
*
* @param parentCode parentCode
- * @param ids ids
+ * @param ids ids
*/
public void recurseFindSubProcess(long parentCode, List<Long> ids) {
List<TaskDefinition> taskNodeList = this.getTaskNodeListByDefinition(parentCode);
@@ -628,7 +631,7 @@ public class ProcessService {
* create recovery waiting thread command and delete origin command at the same time.
* if the recovery command is exists, only update the field update_time
*
- * @param originCommand originCommand
+ * @param originCommand originCommand
* @param processInstance processInstance
*/
public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
@@ -684,7 +687,7 @@ public class ProcessService {
/**
* get schedule time from command
*
- * @param command command
+ * @param command command
* @param cmdParam cmdParam map
* @return date
*/
@@ -713,8 +716,8 @@ public class ProcessService {
* generate a new work process instance from command.
*
* @param processDefinition processDefinition
- * @param command command
- * @param cmdParam cmdParam map
+ * @param command command
+ * @param cmdParam cmdParam map
* @return process instance
*/
private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
@@ -799,7 +802,7 @@ public class ProcessService {
* use definition creator's tenant.
*
* @param tenantId tenantId
- * @param userId userId
+ * @param userId userId
* @return tenant
*/
public Tenant getTenantForProcess(int tenantId, int userId) {
@@ -837,7 +840,7 @@ public class ProcessService {
/**
* check command parameters is valid
*
- * @param command command
+ * @param command command
* @param cmdParam cmdParam map
* @return whether command param is valid
*/
@@ -857,7 +860,7 @@ public class ProcessService {
* construct process instance according to one command.
*
* @param command command
- * @param host host
+ * @param host host
* @return process instance
*/
protected ProcessInstance constructProcessInstance(Command command, String host) {
@@ -1036,7 +1039,7 @@ public class ProcessService {
* return complement data if the process start with complement data
*
* @param processInstance processInstance
- * @param command command
+ * @param command command
* @return command type
*/
private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) {
@@ -1051,8 +1054,8 @@ public class ProcessService {
* initialize complement data parameters
*
* @param processDefinition processDefinition
- * @param processInstance processInstance
- * @param cmdParam cmdParam
+ * @param processInstance processInstance
+ * @param cmdParam cmdParam
*/
private void initComplementDataParam(ProcessDefinition processDefinition,
ProcessInstance processInstance,
@@ -1125,7 +1128,7 @@ public class ProcessService {
* only the keys doesn't in sub process global would be joined.
*
* @param parentGlobalParams parentGlobalParams
- * @param subGlobalParams subGlobalParams
+ * @param subGlobalParams subGlobalParams
* @return global params join
*/
private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) {
@@ -1192,7 +1195,7 @@ public class ProcessService {
* submit sub process to command
*
* @param processInstance processInstance
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
* @return task instance
*/
@Transactional(rollbackFor = Exception.class)
@@ -1223,7 +1226,7 @@ public class ProcessService {
* set map {parent instance id, task instance id, 0(child instance id)}
*
* @param parentInstance parentInstance
- * @param parentTask parentTask
+ * @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
@@ -1252,7 +1255,7 @@ public class ProcessService {
* find previous task work process map.
*
* @param parentProcessInstance parentProcessInstance
- * @param parentTask parentTask
+ * @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
@@ -1278,7 +1281,7 @@ public class ProcessService {
* create sub work process command
*
* @param parentProcessInstance parentProcessInstance
- * @param task task
+ * @param task task
*/
public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) {
if (!task.isSubProcess()) {
@@ -1412,7 +1415,7 @@ public class ProcessService {
* update sub process definition
*
* @param parentProcessInstance parentProcessInstance
- * @param childDefinitionCode childDefinitionId
+ * @param childDefinitionCode childDefinitionId
*/
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) {
ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
@@ -1427,7 +1430,7 @@ public class ProcessService {
/**
* submit task to mysql
*
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
* @param processInstance processInstance
* @return task instance
*/
@@ -1463,7 +1466,7 @@ public class ProcessService {
* return stop if work process state is ready stop
* if all of above are not satisfied, return submit success
*
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
* @param processInstance processInstance
* @return process instance state
*/
@@ -1476,6 +1479,7 @@ public class ProcessService {
state == ExecutionStatus.RUNNING_EXECUTION
|| state == ExecutionStatus.DELAY_EXECUTION
|| state == ExecutionStatus.KILL
+ || state == ExecutionStatus.DISPATCH
) {
return state;
}
@@ -1689,7 +1693,7 @@ public class ProcessService {
* get id list by task state
*
* @param instanceId instanceId
- * @param state state
+ * @param state state
* @return task instance states
*/
public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state) {
@@ -1744,7 +1748,7 @@ public class ProcessService {
* find work process map by parent process id and parent task id.
*
* @param parentWorkProcessId parentWorkProcessId
- * @param parentTaskId parentTaskId
+ * @param parentTaskId parentTaskId
* @return process instance map
*/
public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
@@ -1766,7 +1770,7 @@ public class ProcessService {
* find sub process instance
*
* @param parentProcessId parentProcessId
- * @param parentTaskId parentTaskId
+ * @param parentTaskId parentTaskId
* @return process instance
*/
public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) {
@@ -1796,29 +1800,6 @@ public class ProcessService {
}
/**
- * change task state
- *
- * @param state state
- * @param startTime startTime
- * @param host host
- * @param executePath executePath
- * @param logPath logPath
- */
- public void changeTaskState(TaskInstance taskInstance,
- ExecutionStatus state,
- Date startTime,
- String host,
- String executePath,
- String logPath) {
- taskInstance.setState(state);
- taskInstance.setStartTime(startTime);
- taskInstance.setHost(host);
- taskInstance.setExecutePath(executePath);
- taskInstance.setLogPath(logPath);
- saveTaskInstance(taskInstance);
- }
-
- /**
* update process instance
*
* @param processInstance processInstance
@@ -1829,27 +1810,6 @@ public class ProcessService {
}
/**
- * change task state
- *
- * @param state state
- * @param endTime endTime
- * @param varPool varPool
- */
- public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
- Date endTime,
- int processId,
- String appIds,
- String varPool) {
- taskInstance.setPid(processId);
- taskInstance.setAppLink(appIds);
- taskInstance.setState(state);
- taskInstance.setEndTime(endTime);
- taskInstance.setVarPool(varPool);
- changeOutParam(taskInstance);
- saveTaskInstance(taskInstance);
- }
-
- /**
* for show in page of taskInstance
*/
public void changeOutParam(TaskInstance taskInstance) {
@@ -2006,7 +1966,7 @@ public class ProcessService {
* update process instance state by id
*
* @param processInstanceId processInstanceId
- * @param executionStatus executionStatus
+ * @param executionStatus executionStatus
* @return update process result
*/
public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
@@ -2042,7 +2002,7 @@ public class ProcessService {
/**
* find tenant code by resource name
*
- * @param resName resource name
+ * @param resName resource name
* @param resourceType resource type
* @return tenant code
*/
@@ -2080,7 +2040,7 @@ public class ProcessService {
* find last scheduler process instance in the date interval
*
* @param definitionCode definitionCode
- * @param dateInterval dateInterval
+ * @param dateInterval dateInterval
* @return process instance
*/
public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) {
@@ -2093,7 +2053,7 @@ public class ProcessService {
* find last manual process instance interval
*
* @param definitionCode process definition code
- * @param dateInterval dateInterval
+ * @param dateInterval dateInterval
* @return process instance
*/
public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) {
@@ -2106,8 +2066,8 @@ public class ProcessService {
* find last running process instance
*
* @param definitionCode process definition code
- * @param startTime start time
- * @param endTime end time
+ * @param startTime start time
+ * @param endTime end time
* @return process instance
*/
public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) {
@@ -2191,7 +2151,7 @@ public class ProcessService {
/**
* list unauthorized udf function
*
- * @param userId user id
+ * @param userId user id
* @param needChecks data source id array
* @return unauthorized udf function list
*/
@@ -2599,7 +2559,7 @@ public class ProcessService {
* add authorized resources
*
* @param ownResources own resources
- * @param userId userId
+ * @param userId userId
*/
private void addAuthorizedResources(List<Resource> ownResources, int userId) {
List<Integer> relationResourceIds = resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 7);
@@ -2742,12 +2702,7 @@ public class ProcessService {
/**
* the first time (when submit the task ) get the resource of the task group
*
- * @param taskId task id
- * @param taskName
- * @param groupId
- * @param processId
- * @param priority
- * @return
+ * @param taskId task id
*/
public boolean acquireTaskGroup(int taskId,
String taskName, int groupId,
@@ -2788,9 +2743,6 @@ public class ProcessService {
/**
* try to get the task group resource(when other task release the resource)
- *
- * @param taskGroupQueue
- * @return
*/
public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
@@ -2877,11 +2829,11 @@ public class ProcessService {
/**
* insert into task group queue
*
- * @param taskId task id
- * @param taskName task name
- * @param groupId group id
+ * @param taskId task id
+ * @param taskName task name
+ * @param groupId group id
* @param processId process id
- * @param priority priority
+ * @param priority priority
* @return result and msg code
*/
public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index 882ab59..581c73c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -208,6 +208,16 @@ public class TaskExecutionContext {
private ResourceParametersHelper resourceParametersHelper;
/**
+ * endTime
+ */
+ private Date endTime;
+
+ /**
+ * sql TaskExecutionContext
+ */
+ private SQLTaskExecutionContext sqlTaskExecutionContext;
+
+ /**
* resources full name and tenant code
*/
private Map<String, String> resources;
@@ -538,12 +548,61 @@ public class TaskExecutionContext {
this.dataQualityTaskExecutionContext = dataQualityTaskExecutionContext;
}
+ public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) {
+ this.currentExecutionStatus = currentExecutionStatus;
+ }
+
public ExecutionStatus getCurrentExecutionStatus() {
return currentExecutionStatus;
}
- public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) {
- this.currentExecutionStatus = currentExecutionStatus;
+ public Date getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(Date endTime) {
+ this.endTime = endTime;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskExecutionContext{"
+ + "taskInstanceId=" + taskInstanceId
+ + ", taskName='" + taskName + '\''
+ + ", currentExecutionStatus=" + currentExecutionStatus
+ + ", firstSubmitTime=" + firstSubmitTime
+ + ", startTime=" + startTime
+ + ", taskType='" + taskType + '\''
+ + ", host='" + host + '\''
+ + ", executePath='" + executePath + '\''
+ + ", logPath='" + logPath + '\''
+ + ", taskJson='" + taskJson + '\''
+ + ", processId=" + processId
+ + ", processDefineCode=" + processDefineCode
+ + ", processDefineVersion=" + processDefineVersion
+ + ", appIds='" + appIds + '\''
+ + ", processInstanceId=" + processInstanceId
+ + ", scheduleTime=" + scheduleTime
+ + ", globalParams='" + globalParams + '\''
+ + ", executorId=" + executorId
+ + ", cmdTypeIfComplement=" + cmdTypeIfComplement
+ + ", tenantCode='" + tenantCode + '\''
+ + ", queue='" + queue + '\''
+ + ", projectCode=" + projectCode
+ + ", taskParams='" + taskParams + '\''
+ + ", envFile='" + envFile + '\''
+ + ", dryRun='" + dryRun + '\''
+ + ", definedParams=" + definedParams
+ + ", taskAppId='" + taskAppId + '\''
+ + ", taskTimeoutStrategy=" + taskTimeoutStrategy
+ + ", taskTimeout=" + taskTimeout
+ + ", workerGroup='" + workerGroup + '\''
+ + ", environmentConfig='" + environmentConfig + '\''
+ + ", delayTime=" + delayTime
+ + ", resources=" + resources
+ + ", sqlTaskExecutionContext=" + sqlTaskExecutionContext
+ + ", dataQualityTaskExecutionContext=" + dataQualityTaskExecutionContext
+ + '}';
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
index 7295f43..8d2774c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/ExecutionStatus.java
@@ -62,7 +62,9 @@ public enum ExecutionStatus {
FORCED_SUCCESS(13, "forced success"),
SERIAL_WAIT(14, "serial wait"),
READY_BLOCK(15, "ready block"),
- BLOCK(16, "block");
+ BLOCK(16, "block"),
+ DISPATCH(17, "dispatch"),
+ ;
ExecutionStatus(int code, String descp) {
this.code = code;
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 4311612..45401b6 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -28,10 +28,10 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
@@ -111,10 +111,10 @@ public class WorkerServer implements IStoppable {
private TaskKillProcessor taskKillProcessor;
@Autowired
- private DBTaskAckProcessor dbTaskAckProcessor;
+ private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
@Autowired
- private DBTaskResponseProcessor dbTaskResponseProcessor;
+ private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
@Autowired
private HostUpdateProcessor hostUpdateProcessor;
@@ -143,8 +143,8 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, dbTaskAckProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, dbTaskResponseProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
// logger server
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
index aaf557b..f28990b 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
@@ -30,28 +30,30 @@ public class ResponseCache {
private static final ResponseCache instance = new ResponseCache();
- private ResponseCache(){}
+ private ResponseCache() {
+ }
public static ResponseCache get() {
return instance;
}
- private Map<Integer,Command> ackCache = new ConcurrentHashMap<>();
- private Map<Integer,Command> responseCache = new ConcurrentHashMap<>();
+ private Map<Integer, Command> runningCache = new ConcurrentHashMap<>();
+ private Map<Integer, Command> responseCache = new ConcurrentHashMap<>();
/**
* cache response
+ *
* @param taskInstanceId taskInstanceId
* @param command command
* @param event event ACK/RESULT
*/
public void cache(Integer taskInstanceId, Command command, Event event) {
switch (event) {
- case ACK:
- ackCache.put(taskInstanceId,command);
+ case RUNNING:
+ runningCache.put(taskInstanceId, command);
break;
case RESULT:
- responseCache.put(taskInstanceId,command);
+ responseCache.put(taskInstanceId, command);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
@@ -59,15 +61,17 @@ public class ResponseCache {
}
/**
- * remove ack cache
+ * remove running cache
+ *
* @param taskInstanceId taskInstanceId
*/
- public void removeAckCache(Integer taskInstanceId) {
- ackCache.remove(taskInstanceId);
+ public void removeRunningCache(Integer taskInstanceId) {
+ runningCache.remove(taskInstanceId);
}
/**
- * remove reponse cache
+ * remove response cache
+ *
* @param taskInstanceId taskInstanceId
*/
public void removeResponseCache(Integer taskInstanceId) {
@@ -75,18 +79,20 @@ public class ResponseCache {
}
/**
- * getAckCache
+ * get running cache
+ *
* @return getAckCache
*/
- public Map<Integer,Command> getAckCache() {
- return ackCache;
+ public Map<Integer, Command> getRunningCache() {
+ return runningCache;
}
/**
* getResponseCache
+ *
* @return getResponseCache
*/
- public Map<Integer,Command> getResponseCache() {
+ public Map<Integer, Command> getResponseCache() {
return responseCache;
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 09b2c3a..9a0e894 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -19,16 +19,25 @@ package org.apache.dolphinscheduler.server.worker.processor;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
+import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
+import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import io.netty.channel.Channel;
@@ -45,6 +54,12 @@ public class TaskCallbackService {
private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
private static final int[] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200};
+ @Autowired
+ private TaskExecuteResponseAckProcessor taskExecuteRunningProcessor;
+
+ @Autowired
+ private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
+
/**
* remote channels
*/
@@ -58,15 +73,15 @@ public class TaskCallbackService {
public TaskCallbackService() {
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
- this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
- this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
+ this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
+ this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
}
/**
* add callback channel
*
* @param taskInstanceId taskInstanceId
- * @param channel channel
+ * @param channel channel
*/
public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
REMOTE_CHANNELS.put(taskInstanceId, channel);
@@ -129,25 +144,12 @@ public class TaskCallbackService {
}
/**
- * send ack
- *
- * @param taskInstanceId taskInstanceId
- * @param command command
- */
- public void sendAck(int taskInstanceId, Command command) {
- NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
- if (nettyRemoteChannel != null) {
- nettyRemoteChannel.writeAndFlush(command);
- }
- }
-
- /**
* send result
*
* @param taskInstanceId taskInstanceId
- * @param command command
+ * @param command command
*/
- public void sendResult(int taskInstanceId, Command command) {
+ public void send(int taskInstanceId, Command command) {
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if (nettyRemoteChannel != null) {
nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@@ -161,6 +163,99 @@ public class TaskCallbackService {
}
});
}
+ }
+
+ /**
+ * build task execute running command
+ *
+ * @param taskExecutionContext taskExecutionContext
+ * @return TaskExecuteAckCommand
+ */
+ private TaskExecuteRunningCommand buildTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
+ TaskExecuteRunningCommand command = new TaskExecuteRunningCommand();
+ command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+ command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
+ command.setLogPath(taskExecutionContext.getLogPath());
+ command.setHost(taskExecutionContext.getHost());
+ command.setStartTime(taskExecutionContext.getStartTime());
+ command.setExecutePath(taskExecutionContext.getExecutePath());
+ return command;
+ }
+
+ /**
+ * build task execute response command
+ *
+ * @param taskExecutionContext taskExecutionContext
+ * @return TaskExecuteResponseCommand
+ */
+ private TaskExecuteResponseCommand buildTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
+ TaskExecuteResponseCommand command = new TaskExecuteResponseCommand();
+ command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
+ command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
+ command.setLogPath(taskExecutionContext.getLogPath());
+ command.setExecutePath(taskExecutionContext.getExecutePath());
+ command.setAppIds(taskExecutionContext.getAppIds());
+ command.setProcessId(taskExecutionContext.getProcessId());
+ command.setHost(taskExecutionContext.getHost());
+ command.setStartTime(taskExecutionContext.getStartTime());
+ command.setEndTime(taskExecutionContext.getEndTime());
+ command.setVarPool(taskExecutionContext.getVarPool());
+ command.setExecutePath(taskExecutionContext.getExecutePath());
+ return command;
+ }
+
+ /**
+ * build TaskKillResponseCommand
+ *
+ * @param taskExecutionContext taskExecutionContext
+ * @return build TaskKillResponseCommand
+ */
+ private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext) {
+ TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
+ taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
+ taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
+ taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ taskKillResponseCommand.setHost(taskExecutionContext.getHost());
+ taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
+ return taskKillResponseCommand;
+ }
+
+ /**
+ * send task execute running command
+ * todo unified callback command
+ */
+ public void sendTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
+ TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
+ // add response cache
+ ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RUNNING);
+ send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
+ }
+
+ /**
+ * send task execute delay command
+ * todo unified callback command
+ */
+ public void sendTaskExecuteDelayCommand(TaskExecutionContext taskExecutionContext) {
+ TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
+ send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
+ }
+
+ /**
+ * send task execute response command
+ * todo unified callback command
+ */
+ public void sendTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
+ TaskExecuteResponseCommand command = buildTaskExecuteResponseCommand(taskExecutionContext);
+ // add response cache
+ ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RESULT);
+ send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
+ }
+ public void sendTaskKillResponseCommand(TaskExecutionContext taskExecutionContext) {
+ TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext);
+ send(taskExecutionContext.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
+ TaskCallbackService.remove(taskExecutionContext.getTaskInstanceId());
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 88021e1..29da668 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
@@ -30,12 +29,10 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.utils.LogUtils;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
@@ -128,6 +125,21 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
+ if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
+ OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
+ }
+
+ // check if the OS user exists
+ if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
+ logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
+ taskExecutionContext.getTenantCode(), taskExecutionContext.getTaskInstanceId());
+ TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+ taskExecutionContext.setEndTime(new Date());
+ taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+ return;
+ }
+
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
@@ -135,12 +147,13 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
- if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
- OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
- }
} catch (Throwable ex) {
- logger.error("create execLocalPath: {}", execLocalPath, ex);
+ logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", execLocalPath, taskExecutionContext.getTaskInstanceId());
+ logger.error("create executeLocalPath fail", ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+ taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
+ return;
}
}
@@ -153,48 +166,19 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
- } else {
- taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
- taskExecutionContext.setStartTime(new Date());
+ taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
}
- this.doAck(taskExecutionContext);
-
// submit task to manager
- if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager))) {
- logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getDelayQueueSize());
+ boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager));
+ if (!offer) {
+ logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
+ workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId());
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+ taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
}
}
- private void doAck(TaskExecutionContext taskExecutionContext) {
- // tell master that task is in executing
- TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
- ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command(), Event.ACK);
- taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
- }
-
- /**
- * build ack command
- *
- * @param taskExecutionContext taskExecutionContext
- * @return TaskExecuteAckCommand
- */
- private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
- TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
- ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
- ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
- ackCommand.setHost(taskExecutionContext.getHost());
- ackCommand.setStartTime(taskExecutionContext.getStartTime());
-
- ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
-
- taskExecutionContext.setLogPath(ackCommand.getLogPath());
- ackCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
-
- return ackCommand;
- }
-
/**
* get execute local path
*
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
similarity index 60%
rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
index bedc5a6..e080842 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java
@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
@@ -34,31 +34,31 @@ import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
- * db task response processor
+ * task execute running ack, from master to worker
*/
@Component
-public class DBTaskResponseProcessor implements NettyRequestProcessor {
+public class TaskExecuteResponseAckProcessor implements NettyRequestProcessor {
- private final Logger logger = LoggerFactory.getLogger(DBTaskResponseProcessor.class);
+ private final Logger logger = LoggerFactory.getLogger(TaskExecuteResponseAckProcessor.class);
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.DB_TASK_RESPONSE == command.getType(),
+ Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
- DBTaskResponseCommand taskResponseCommand = JSONUtils.parseObject(
- command.getBody(), DBTaskResponseCommand.class);
+ TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = JSONUtils.parseObject(
+ command.getBody(), TaskExecuteResponseAckCommand.class);
- if (taskResponseCommand == null) {
- logger.error("dBTask Response command is null");
+ if (taskExecuteResponseAckCommand == null) {
+ logger.error("task execute response ack command is null");
return;
}
- logger.info("dBTask Response command : {}", taskResponseCommand);
+ logger.info("task execute response ack command : {}", taskExecuteResponseAckCommand);
- if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
- ResponseCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
- TaskCallbackService.remove(taskResponseCommand.getTaskInstanceId());
- logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskResponseCommand.getTaskInstanceId());
+ if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
+ ResponseCache.get().removeResponseCache(taskExecuteResponseAckCommand.getTaskInstanceId());
+ TaskCallbackService.remove(taskExecuteResponseAckCommand.getTaskInstanceId());
+ logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskExecuteResponseAckCommand.getTaskInstanceId());
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
similarity index 64%
rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
index 7a80f84..9d74dc5 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
@@ -34,29 +34,29 @@ import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
- * db task ack processor
+ * task execute running ack processor
*/
@Component
-public class DBTaskAckProcessor implements NettyRequestProcessor {
+public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
- private final Logger logger = LoggerFactory.getLogger(DBTaskAckProcessor.class);
+ private final Logger logger = LoggerFactory.getLogger(TaskExecuteRunningAckProcessor.class);
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.DB_TASK_ACK == command.getType(),
+ Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
- DBTaskAckCommand taskAckCommand = JSONUtils.parseObject(
- command.getBody(), DBTaskAckCommand.class);
+ TaskExecuteRunningAckCommand runningAckCommand = JSONUtils.parseObject(
+ command.getBody(), TaskExecuteRunningAckCommand.class);
- if (taskAckCommand == null) {
- logger.error("dBTask ACK request command is null");
+ if (runningAckCommand == null) {
+ logger.error("task execute running ack command is null");
return;
}
- logger.info("dBTask ACK request command : {}", taskAckCommand);
+ logger.info("task execute running ack command : {}", runningAckCommand);
- if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
- ResponseCache.get().removeAckCache(taskAckCommand.getTaskInstanceId());
+ if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
+ ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId());
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 3ec0e9a..1c8fe55 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
@@ -90,10 +91,17 @@ public class TaskKillProcessor implements NettyRequestProcessor {
taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
- TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result);
- taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
- TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
- TaskCallbackService.remove(killCommand.getTaskInstanceId());
+ TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
+ if (taskExecutionContext == null) {
+ logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
+ return;
+ }
+ taskExecutionContext.setCurrentExecutionStatus(result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE);
+ taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
+
+ taskCallbackService.sendTaskKillResponseCommand(taskExecutionContext);
+ TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+
logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index 4201373..fc737ca 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -47,7 +47,7 @@ public class RetryReportTaskStatusThread implements Runnable {
private TaskCallbackService taskCallbackService;
public void start() {
- Thread thread = new Thread(this,"RetryReportTaskStatusThread");
+ Thread thread = new Thread(this, "RetryReportTaskStatusThread");
thread.setDaemon(true);
thread.start();
}
@@ -65,21 +65,21 @@ public class RetryReportTaskStatusThread implements Runnable {
ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
try {
- if (!instance.getAckCache().isEmpty()) {
- Map<Integer,Command> ackCache = instance.getAckCache();
- for (Map.Entry<Integer, Command> entry : ackCache.entrySet()) {
+ if (!instance.getRunningCache().isEmpty()) {
+ Map<Integer, Command> runningCache = instance.getRunningCache();
+ for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
- Command ackCommand = entry.getValue();
- taskCallbackService.sendAck(taskInstanceId,ackCommand);
+ Command runningCommand = entry.getValue();
+ taskCallbackService.send(taskInstanceId, runningCommand);
}
}
if (!instance.getResponseCache().isEmpty()) {
- Map<Integer,Command> responseCache = instance.getResponseCache();
+ Map<Integer, Command> responseCache = instance.getResponseCache();
for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
- taskCallbackService.sendResult(taskInstanceId,responseCommand);
+ taskCallbackService.send(taskInstanceId, responseCommand);
}
}
} catch (Exception e) {
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 8f3dc3a..7670d49 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -17,19 +17,15 @@
package org.apache.dolphinscheduler.server.worker.runner;
-import com.github.rholder.retry.RetryException;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -37,17 +33,14 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
-import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
import java.io.File;
import java.io.IOException;
@@ -57,11 +50,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Delayed;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* task scheduler thread
@@ -131,35 +124,26 @@ public class TaskExecuteThread implements Runnable, Delayed {
@Override
public void run() {
- TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
- responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
- responseCommand.setEndTime(new Date());
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
+ taskExecutionContext.setStartTime(new Date());
+ taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
- taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
+ taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
return;
}
try {
logger.info("script path : {}", taskExecutionContext.getExecutePath());
- // check if the OS user exists
- if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
- String errorLog = String.format("tenantCode: %s does not exist", taskExecutionContext.getTenantCode());
- logger.error(errorLog);
- responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
- responseCommand.setEndTime(new Date());
- return;
- }
-
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
}
- if (taskExecutionContext.getCurrentExecutionStatus() != ExecutionStatus.RUNNING_EXECUTION) {
- changeTaskExecutionStatusToRunning();
- }
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
+ // callback task execute running
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
+ taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
+
// copy hdfs/minio file to local
downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger);
@@ -197,29 +181,27 @@ public class TaskExecuteThread implements Runnable, Delayed {
// task handle
this.task.handle();
- responseCommand.setStatus(this.task.getExitStatus().getCode());
-
// task result process
if (this.task.getNeedAlert()) {
- sendAlert(this.task.getTaskAlertInfo(), responseCommand.getStatus());
+ sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
}
- responseCommand.setEndTime(new Date());
- responseCommand.setProcessId(this.task.getProcessId());
- responseCommand.setAppIds(this.task.getAppIds());
- responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));
+ taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
+ taskExecutionContext.setProcessId(this.task.getProcessId());
+ taskExecutionContext.setAppIds(this.task.getAppIds());
+ taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
} catch (Throwable e) {
logger.error("task scheduler failure", e);
kill();
- responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
- responseCommand.setEndTime(new Date());
- responseCommand.setProcessId(task.getProcessId());
- responseCommand.setAppIds(task.getAppIds());
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
+ taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
+ taskExecutionContext.setProcessId(this.task.getProcessId());
+ taskExecutionContext.setAppIds(this.task.getAppIds());
} finally {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
- taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
+ taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
clearTaskExecPath();
}
}
@@ -312,7 +294,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
// query the tenant code of the resource according to the name of the resource
String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName);
logger.info("get resource file from hdfs :{}", resHdfsPath);
- storageOperate.download(tenantCode,resHdfsPath, execLocalPath + File.separator + fullName, false, true);
+ storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new ServiceException(e.getMessage());
@@ -324,40 +306,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
/**
- * send an ack to change the status of the task.
- */
- private void changeTaskExecutionStatusToRunning() {
- taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
- Command ackCommand = buildAckCommand().convert2Command();
- try {
- RetryerUtils.retryCall(() -> {
- taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
- return Boolean.TRUE;
- });
- } catch (ExecutionException | RetryException e) {
- logger.error(e.getMessage(), e);
- }
- }
-
- /**
- * build ack command.
- *
- * @return TaskExecuteAckCommand
- */
- private TaskExecuteAckCommand buildAckCommand() {
- TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
- ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
- ackCommand.setStartTime(taskExecutionContext.getStartTime());
- ackCommand.setLogPath(taskExecutionContext.getLogPath());
- ackCommand.setHost(taskExecutionContext.getHost());
-
- ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
-
- return ackCommand;
- }
-
- /**
* get current TaskExecutionContext
*
* @return TaskExecutionContext
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 9fd7bafc..0dd28e5 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -108,7 +108,7 @@ public class WorkerManagerThread implements Runnable {
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
responseCommand.setStatus(ExecutionStatus.KILL.getCode());
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
- taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
+ taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
}
/**
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
index fba6c0e..2bb23ca 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
@@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -53,7 +53,7 @@ import org.slf4j.Logger;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
- JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
+ JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
@Ignore
public class TaskExecuteProcessorTest {
@@ -84,7 +84,7 @@ public class TaskExecuteProcessorTest {
workerConfig.setListenPort(1234);
command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST);
- ackCommand = new TaskExecuteAckCommand().convert2Command();
+ ackCommand = new TaskExecuteRunningCommand().convert2Command();
taskRequestCommand = new TaskExecuteRequestCommand();
alertClientService = PowerMockito.mock(AlertClientService.class);
workerExecService = PowerMockito.mock(ExecutorService.class);
@@ -95,7 +95,7 @@ public class TaskExecuteProcessorTest {
PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
- PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
+ PowerMockito.doNothing().when(taskCallbackService).send(taskExecutionContext.getTaskInstanceId(), ackCommand);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
@@ -125,10 +125,10 @@ public class TaskExecuteProcessorTest {
PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
- taskExecutionContext.getProcessDefineCode(),
- taskExecutionContext.getProcessDefineVersion(),
- taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId()))
+ taskExecutionContext.getProcessDefineCode(),
+ taskExecutionContext.getProcessDefineVersion(),
+ taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId()))
.thenReturn(taskExecutionContext.getExecutePath());
PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());