You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/02/19 13:57:22 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: Refactor worker (#1979)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new ac93d78  Refactor worker (#1979)
ac93d78 is described below

commit ac93d783796e4819ac6a46b1bab7efa272e79b47
Author: Tboy <gu...@immomo.com>
AuthorDate: Wed Feb 19 21:57:16 2020 +0800

    Refactor worker (#1979)
    
    * updates
    
    * move FetchTaskThread logic to WorkerNettyRequestProcessor
    
    * add NettyRemotingClient to scheduler thread
    
    * refactor TaskScheduleThread, add TaskInstanceCallbackService
---
 .../remote/command/CommandType.java                |  2 +-
 .../remote/command/ExecuteTaskAckCommand.java      |  1 +
 .../remote/command/ExecuteTaskResponseCommand.java |  2 +-
 .../apache/dolphinscheduler/remote/utils/Pair.java |  4 ++
 .../server/worker/processor/CallbackChannel.java   | 38 +++++------
 .../processor/TaskInstanceCallbackService.java     | 76 ++++++++++++++++++++++
 .../processor/WorkerNettyRequestProcessor.java     |  8 ++-
 .../server/worker/runner/TaskScheduleThread.java   | 73 +++++++++++----------
 8 files changed, 146 insertions(+), 58 deletions(-)

diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index b1b24d3..79ef2d9 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;


public enum CommandType {

    /**
     *  roll view log request
     */
    ROLL_VIEW_LOG_REQUEST,

    /**
     *  roll view log response
  
    */
    ROLL_VIEW_LOG_RESPONSE,

    /**
     * view whole log request
     */
    VIEW_WHOLE_LOG_REQUEST,

    /**
     * view whole log response
     */
    VIEW_WHOLE_LOG_RESPONSE,

    /**
     * get log bytes request
     */
    GET_LOG_BYTES_REQUEST,

    /**
     * get log bytes response
     */
    GET_LOG_BYTES_RESPONSE,


    WORKER_REQUEST,
    MASTER_RESPONSE,

    /**
     * execute task request
     */
    EXECUTE_TASK_REQUEST,

    /**
     * execute task response
     */
    EXECUTE_TASK_RESPONSE,

    /**
     *  ping
     */
    PING,

    /**
     *  pong
     */
    PONG;
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;


public enum CommandType {

    /**
     *  roll view log request
     */
    ROLL_VIEW_LOG_REQUEST,

    /**
     *  roll view log response
  
    */
    ROLL_VIEW_LOG_RESPONSE,

    /**
     * view whole log request
     */
    VIEW_WHOLE_LOG_REQUEST,

    /**
     * view whole log response
     */
    VIEW_WHOLE_LOG_RESPONSE,

    /**
     * get log bytes request
     */
    GET_LOG_BYTES_REQUEST,

    /**
     * get log bytes response
     */
    GET_LOG_BYTES_RESPONSE,


    WORKER_REQUEST,
    MASTER_RESPONSE,

    /**
     * execute task request
     */
    EXECUTE_TASK_REQUEST,


    /**
     * execute task ack
     */
    EXECUTE_TASK_ACK,
    /**
     * execute task response
     */
    EXECUTE_TASK_RESPONSE,

    /**
     *  ping
     */
    PING,

    /**
     *  pong
     */
    PONG;
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
new file mode 100644
index 0000000..24ab68f
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
@@ -0,0 +1 @@
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task 
 request command
 */
public class ExecuteTaskAckCommand implements Serializable {

    private int taskInstanceId;

    private Date startTime;

    private String host;

    private int status;

    private String logPath;

    private String executePath;

    public Date getStartTime() {
        return startTime;
    }

    public void setStartTime(Date startTime) {
        this.startTime = startTime;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public String getLogPath() {
        return logPath;
    }

    public void setLogPath(String logPath) {
        this.logPath = lo
 gPath;
    }

    public String getExecutePath() {
        return executePath;
    }

    public void setExecutePath(String executePath) {
        this.executePath = executePath;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(long opaque){
        Command command = new Command(opaque);
        command.setType(CommandType.EXECUTE_TASK_ACK);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
index 7e35fa6..3e6d5c1 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong
 ;

/**
 *  execute task response command
 */
public class ExecuteTaskResponseCommand implements Serializable {

    /**
     *  task id
     */
    private String taskId;

    /**
     *  attempt id
     */
    private String attemptId;

    /**
     *  return result
     */
    private Object result;

    /**
     *  received time
     */
    private long receivedTime;

    /**
     *  execute count
     */
    private int executeCount;

    /**
     *  execute time
     */
    private long executeTime;

    public String getAttemptId() {
        return attemptId;
    }

    public void setAttemptId(String attemptId) {
        this.attemptId = attemptId;
    }

    public String getTaskId() {
        return taskId;
    }

    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }

    public long getReceivedTime() {
      
   return receivedTime;
    }

    public void setReceivedTime(long receivedTime) {
        this.receivedTime = receivedTime;
    }

    public int getExecuteCount() {
        return executeCount;
    }

    public void setExecuteCount(int executeCount) {
        this.executeCount = executeCount;
    }

    public long getExecuteTime() {
        return executeTime;
    }

    public void setExecuteTime(long executeTime) {
        this.executeTime = executeTime;
    }

    public Command convert2Command(long opaque){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_RESPONSE);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task 
 response command
 */
public class ExecuteTaskResponseCommand implements Serializable {


    public ExecuteTaskResponseCommand() {
    }

    public ExecuteTaskResponseCommand(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    /**
     *  task instance id
     */
    private int taskInstanceId;

    /**
     *  status
     */
    private int status;


    /**
     *  end time
     */
    private Date endTime;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public Date getEndTime() {
        return endTime;
    }

    public void setEndTime(Date endTime) {
        this.endTime = endTime;
    }

    public Command convert2Command(){
        Command command = new Command();
        com
 mand.setType(CommandType.EXECUTE_TASK_RESPONSE);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
index 2042191..33bf8ca 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
@@ -50,4 +50,8 @@ public class Pair<L, R> {
     public void setRight(R right) {
         this.right = right;
     }
+
+    public static <L, R> Pair of(L left, R right){
+        return new Pair(left, right);
+    }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
similarity index 60%
copy from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
copy to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
index 2042191..95345c0 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
@@ -15,39 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.remote.utils;
+package org.apache.dolphinscheduler.server.worker.processor;
 
+import io.netty.channel.Channel;
 
-/**
- * key value pair
- *
- * @param <L> L generic type
- * @param <R> R generic type
- */
-public class Pair<L, R> {
 
-    private L left;
+public class CallbackChannel {
+
+    private Channel channel;
 
-    private R right;
+    private long opaque;
 
-    public Pair(L left, R right) {
-        this.left = left;
-        this.right = right;
+    public CallbackChannel(Channel channel, long opaque) {
+        this.channel = channel;
+        this.opaque = opaque;
     }
 
-    public L getLeft() {
-        return left;
+    public Channel getChannel() {
+        return channel;
     }
 
-    public void setLeft(L left) {
-        this.left = left;
+    public void setChannel(Channel channel) {
+        this.channel = channel;
     }
 
-    public R getRight() {
-        return right;
+    public long getOpaque() {
+        return opaque;
     }
 
-    public void setRight(R right) {
-        this.right = right;
+    public void setOpaque(long opaque) {
+        this.opaque = opaque;
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java
new file mode 100644
index 0000000..0480d94
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TaskInstanceCallbackService {
+
+    private static final ConcurrentHashMap<Integer, CallbackChannel> CALL_BACK_CHANNELS = new ConcurrentHashMap<>();
+
+    public void addCallbackChannel(int taskInstanceId, CallbackChannel channel){
+        CALL_BACK_CHANNELS.put(taskInstanceId, channel);
+    }
+
+    public CallbackChannel getCallbackChannel(int taskInstanceId){
+        CallbackChannel callbackChannel = CALL_BACK_CHANNELS.get(taskInstanceId);
+        if(callbackChannel.getChannel().isActive()){
+            return callbackChannel;
+        }
+        Channel newChannel = createChannel();
+        callbackChannel.setChannel(newChannel);
+        CALL_BACK_CHANNELS.put(taskInstanceId, callbackChannel);
+        return callbackChannel;
+    }
+
+    public void remove(int taskInstanceId){
+        CALL_BACK_CHANNELS.remove(taskInstanceId);
+    }
+
+    public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
+        CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
+        callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque()));
+    }
+
+    public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
+        CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
+        callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
+
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if(future.isSuccess()){
+                    remove(taskInstanceId);
+                    return;
+                }
+            }
+        });
+    }
+
+    //TODO
+    private Channel createChannel(){
+        return null;
+    }
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java
index c0db034..2e5ea99 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java
@@ -51,8 +51,11 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
 
     private final WorkerConfig workerConfig;
 
+    private final TaskInstanceCallbackService taskInstanceCallbackService;
+
     public WorkerNettyRequestProcessor(ProcessService processService){
         this.processService = processService;
+        this.taskInstanceCallbackService = new TaskInstanceCallbackService();
         this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
         this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
     }
@@ -62,6 +65,7 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
         Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
         logger.debug("received command : {}", command);
         TaskInstance taskInstance = FastJsonSerializer.deserialize(command.getBody(), TaskInstance.class);
+        //TODO 需要干掉,然后移到master里面。
         int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
         Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
         // verify tenant is null
@@ -73,6 +77,7 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
         String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
         taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
         taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
+        //TODO 到这里。
         // local execute path
         String execLocalPath = getExecLocalPath(taskInstance);
         logger.info("task instance  local execute path : {} ", execLocalPath);
@@ -84,7 +89,8 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
             logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
         }
         // submit task
-        workerExecService.submit(new TaskScheduleThread(taskInstance, processService));
+        taskInstanceCallbackService.addCallbackChannel(taskInstance.getId(), new CallbackChannel(channel, command.getOpaque()));
+        workerExecService.submit(new TaskScheduleThread(taskInstance, processService, taskInstanceCallbackService));
     }
 
     private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
index a69cffd..96cb0c2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
@@ -24,17 +24,21 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.server.worker.processor.TaskInstanceCallbackService;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.TaskManager;
 import org.apache.dolphinscheduler.server.worker.task.TaskProps;
@@ -74,22 +78,31 @@ public class TaskScheduleThread implements Runnable {
     private AbstractTask task;
 
     /**
+     *  task instance callback service
+     */
+    private TaskInstanceCallbackService taskInstanceCallbackService;
+
+    /**
      * constructor
      *
      * @param taskInstance  task instance
      * @param processService    process dao
      */
-    public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService){
+    public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskInstanceCallbackService taskInstanceCallbackService){
         this.processService = processService;
         this.taskInstance = taskInstance;
+        this.taskInstanceCallbackService = taskInstanceCallbackService;
     }
 
     @Override
     public void run() {
 
+        ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId());
+
         try {
-            // update task state is running according to task type
-            updateTaskState(taskInstance.getTaskType());
+            // tell master that task is in executing
+            ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType());
+            taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand);
 
             logger.info("script path : {}", taskInstance.getExecutePath());
             // task node
@@ -148,22 +161,21 @@ public class TaskScheduleThread implements Runnable {
             // task result process
             task.after();
 
+            //
+            responseCommand.setStatus(task.getExitStatus().getCode());
+            responseCommand.setEndTime(new Date());
+            logger.info("task instance id : {},task final status : {}", taskInstance.getId(), task.getExitStatus());
+
         }catch (Exception e){
             logger.error("task scheduler failure", e);
             kill();
-            // update task instance state
-            processService.changeTaskState(ExecutionStatus.FAILURE,
-                    new Date(),
-                    taskInstance.getId());
+            responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
+            responseCommand.setEndTime(new Date());
+
+        } finally {
+            taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand);
         }
 
-        logger.info("task instance id : {},task final status : {}",
-                taskInstance.getId(),
-                task.getExitStatus());
-        // update task instance state
-        processService.changeTaskState(task.getExitStatus(),
-                new Date(),
-                taskInstance.getId());
     }
 
     /**
@@ -182,29 +194,22 @@ public class TaskScheduleThread implements Runnable {
         }
         return globalParamsMap;
     }
-
     /**
-     *  update task state according to task type
+     *  build ack command
      * @param taskType
      */
-    private void updateTaskState(String taskType) {
-        // update task status is running
-        if(taskType.equals(TaskType.SQL.name())  ||
-                taskType.equals(TaskType.PROCEDURE.name())){
-            processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
-                    taskInstance.getStartTime(),
-                    taskInstance.getHost(),
-                    null,
-                    getTaskLogPath(),
-                    taskInstance.getId());
+    private ExecuteTaskAckCommand buildAckCommand(String taskType) {
+        ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
+        ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
+        ackCommand.setLogPath(getTaskLogPath());
+        ackCommand.setHost("localhost");
+        ackCommand.setStartTime(new Date());
+        if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
+            ackCommand.setExecutePath(null);
         }else{
-            processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
-                    taskInstance.getStartTime(),
-                    taskInstance.getHost(),
-                    taskInstance.getExecutePath(),
-                    getTaskLogPath(),
-                    taskInstance.getId());
+            ackCommand.setExecutePath(taskInstance.getExecutePath());
         }
+        return ackCommand;
     }
 
     /**