You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/02/25 02:37:16 UTC

[incubator-dolphinscheduler] 01/03: Refactor worker (#4)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git

commit 1292b03e33739378834b210695f0778666152684
Author: Tboy <te...@yeah.net>
AuthorDate: Mon Feb 24 10:00:54 2020 +0800

    Refactor worker (#4)
    
    * Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * updates
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * buildAckCommand taskInstanceId not set modify (#2002)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
---
 .../server/worker/runner/TaskScheduleThread.java            | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
index b288aea..735e4ba 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
@@ -97,7 +97,7 @@ public class TaskScheduleThread implements Runnable {
 
         try {
             // tell master that task is in executing
-            ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext.getTaskType());
+            ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext);
             taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
 
             logger.info("script path : {}", taskExecutionContext.getExecutePath());
@@ -182,17 +182,20 @@ public class TaskScheduleThread implements Runnable {
         }
         return globalParamsMap;
     }
+
     /**
-     *  build ack command
-     * @param taskType taskType
+     * build ack command
+     * @param taskExecutionContext taskExecutionContext
+     * @return ExecuteTaskAckCommand
      */
-    private ExecuteTaskAckCommand buildAckCommand(String taskType) {
+    private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
         ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
+        ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
         ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
         ackCommand.setLogPath(getTaskLogPath());
         ackCommand.setHost(OSUtils.getHost());
         ackCommand.setStartTime(new Date());
-        if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
+        if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
             ackCommand.setExecutePath(null);
         }else{
             ackCommand.setExecutePath(taskExecutionContext.getExecutePath());