You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/02/25 02:37:16 UTC
[incubator-dolphinscheduler] 01/03: Refactor worker (#4)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
commit 1292b03e33739378834b210695f0778666152684
Author: Tboy <te...@yeah.net>
AuthorDate: Mon Feb 24 10:00:54 2020 +0800
Refactor worker (#4)
* Refactor worker (#2000)
* Refactor worker (#2)
* Refactor worker (#1993)
* Refactor worker (#1)
* add TaskResponseProcessor (#1983)
* 1, master persistent task 2. extract master and worker communication model (#1992)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* updates
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* TaskExecutionContext create modify (#1994)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* updates
* add- register processor
Co-authored-by: qiaozhanwei <qi...@outlook.com>
* buildAckCommand taskInstanceId not set modify (#2002)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
Co-authored-by: qiaozhanwei <qi...@outlook.com>
---
.../server/worker/runner/TaskScheduleThread.java | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
index b288aea..735e4ba 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
@@ -97,7 +97,7 @@ public class TaskScheduleThread implements Runnable {
try {
// tell master that task is in executing
- ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext.getTaskType());
+ ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext);
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
logger.info("script path : {}", taskExecutionContext.getExecutePath());
@@ -182,17 +182,20 @@ public class TaskScheduleThread implements Runnable {
}
return globalParamsMap;
}
+
/**
- * build ack command
- * @param taskType taskType
+ * build ack command
+ * @param taskExecutionContext taskExecutionContext
+ * @return ExecuteTaskAckCommand
*/
- private ExecuteTaskAckCommand buildAckCommand(String taskType) {
+ private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
+ ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath());
ackCommand.setHost(OSUtils.getHost());
ackCommand.setStartTime(new Date());
- if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
+ if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null);
}else{
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());