You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/02 14:15:16 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: refactor kill logic (#2060)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 71c11bd  refactor kill logic (#2060)
71c11bd is described below

commit 71c11bd59474787da9ad49d2a30c312afad41440
Author: Tboy <gu...@immomo.com>
AuthorDate: Mon Mar 2 22:15:08 2020 +0800

    refactor kill logic (#2060)
---
 .../remote/command/CommandType.java                |   2 +-
 ...kAckCommand.java => TaskExecuteAckCommand.java} |   2 +-
 ...Command.java => TaskExecuteRequestCommand.java} |   2 +-
 ...ommand.java => TaskExecuteResponseCommand.java} |   2 +-
 ...estCommand.java => TaskKillRequestCommand.java} |   2 +-
 ...seCommand.java => TaskKillResponseCommand.java} |   2 +-
 .../server/master/MasterServer.java                |   6 +-
 .../master/cache/TaskInstanceCacheManager.java     |  10 +-
 .../cache/impl/TaskInstanceCacheManagerImpl.java   |  16 +--
 .../master/dispatch/executor/ExecutorManager.java  |   8 ++
 .../dispatch/executor/NettyExecutorManager.java    |  20 +++-
 .../master/dispatch/executor/NettyKillManager.java | 119 ---------------------
 .../server/master/processor/TaskAckProcessor.java  |   8 +-
 .../processor/TaskKillResponseProcessor.java       |  13 +--
 .../master/processor/TaskResponseProcessor.java    |   6 +-
 .../server/master/runner/MasterTaskExecThread.java |   8 +-
 .../server/worker/WorkerServer.java                |   4 +-
 .../worker/processor/KillTaskCallbackService.java  | 116 --------------------
 .../worker/processor/TaskCallbackService.java      |  17 +--
 .../worker/processor/TaskExecuteProcessor.java     |  22 ++--
 .../server/worker/processor/TaskKillProcessor.java |  44 ++++----
 .../server/worker/runner/TaskExecuteThread.java    |   6 +-
 22 files changed, 107 insertions(+), 328 deletions(-)

diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index 053b38f..c8d5659 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;


public enum CommandType {

    /**
     *  roll view log request
     */
    ROLL_VIEW_LOG_REQUEST,

    /**
     *  roll view log response
  
    */
    ROLL_VIEW_LOG_RESPONSE,

    /**
     * view whole log request
     */
    VIEW_WHOLE_LOG_REQUEST,

    /**
     * view whole log response
     */
    VIEW_WHOLE_LOG_RESPONSE,

    /**
     * get log bytes request
     */
    GET_LOG_BYTES_REQUEST,

    /**
     * get log bytes response
     */
    GET_LOG_BYTES_RESPONSE,


    WORKER_REQUEST,
    MASTER_RESPONSE,

    /**
     * execute task request
     */
    EXECUTE_TASK_REQUEST,

    /**
     * execute task ack
     */
    EXECUTE_TASK_ACK,

    /**
     * execute task response
     */
    EXECUTE_TASK_RESPONSE,

    /**
     * kill task
     */
    KILL_TASK_REQUEST,

    /**
     * kill task response
     */
    KILL_TASK_RESPONSE,

    /**
     *  ping
     */
    PING,

    /**
     *  pong
     */
    PONG;
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;


public enum CommandType {

    /**
     *  roll view log request
     */
    ROLL_VIEW_LOG_REQUEST,

    /**
     *  roll view log response
  
    */
    ROLL_VIEW_LOG_RESPONSE,

    /**
     * view whole log request
     */
    VIEW_WHOLE_LOG_REQUEST,

    /**
     * view whole log response
     */
    VIEW_WHOLE_LOG_RESPONSE,

    /**
     * get log bytes request
     */
    GET_LOG_BYTES_REQUEST,

    /**
     * get log bytes response
     */
    GET_LOG_BYTES_RESPONSE,


    WORKER_REQUEST,
    MASTER_RESPONSE,

    /**
     * execute task request
     */
    TASK_EXECUTE_REQUEST,

    /**
     * execute task ack
     */
    TASK_EXECUTE_ACK,

    /**
     * execute task response
     */
    TASK_EXECUTE_RESPONSE,

    /**
     * kill task
     */
    TASK_KILL_REQUEST,

    /**
     * kill task response
     */
    TASK_KILL_RESPONSE,

    /**
     *  ping
     */
    PING,

    /**
     *  pong
     */
    PONG;
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
similarity index 92%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
index 52d0a5d..0b3d901 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task 
 request command
 */
public class ExecuteTaskAckCommand implements Serializable {

    /**
     * taskInstanceId
     */
    private int taskInstanceId;

    /**
     * startTime
     */
    private Date startTime;

    /**
     * host
     */
    private String host;

    /**
     * status
     */
    private int status;

    /**
     * logPath
     */
    private String logPath;

    /**
     * executePath
     */
    private String executePath;

    public Date getStartTime() {
        return startTime;
    }

    public void setStartTime(Date startTime) {
        this.startTime = startTime;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstan
 ceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public String getLogPath() {
        return logPath;
    }

    public void setLogPath(String logPath) {
        this.logPath = logPath;
    }

    public String getExecutePath() {
        return executePath;
    }

    public void setExecutePath(String executePath) {
        this.executePath = executePath;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_ACK);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "ExecuteTaskAckCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", startTime=" + startTime +
                ", host='" + host + '\'' +
                ", status=" + status +
                "
 , logPath='" + logPath + '\'' +
                ", executePath='" + executePath + '\'' +
                '}';
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task 
 request command
 */
public class TaskExecuteAckCommand implements Serializable {

    /**
     * taskInstanceId
     */
    private int taskInstanceId;

    /**
     * startTime
     */
    private Date startTime;

    /**
     * host
     */
    private String host;

    /**
     * status
     */
    private int status;

    /**
     * logPath
     */
    private String logPath;

    /**
     * executePath
     */
    private String executePath;

    public Date getStartTime() {
        return startTime;
    }

    public void setStartTime(Date startTime) {
        this.startTime = startTime;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstan
 ceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public String getLogPath() {
        return logPath;
    }

    public void setLogPath(String logPath) {
        this.logPath = logPath;
    }

    public String getExecutePath() {
        return executePath;
    }

    public void setExecutePath(String executePath) {
        this.executePath = executePath;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_EXECUTE_ACK);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskExecuteAckCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", startTime=" + startTime +
                ", host='" + host + '\'' +
                ", status=" + status +
                "
 , logPath='" + logPath + '\'' +
                ", executePath='" + executePath + '\'' +
                '}';
    }
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
similarity index 82%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
index e7564ed..637724f 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  execute task request command
 */
pub
 lic class ExecuteTaskRequestCommand implements Serializable {

    /**
     *  task execution context
     */
    private String taskExecutionContext;

    public String getTaskExecutionContext() {
        return taskExecutionContext;
    }

    public void setTaskExecutionContext(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    public ExecuteTaskRequestCommand() {
    }

    public ExecuteTaskRequestCommand(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "ExecuteTaskRequestCommand{" +
           
      "taskExecutionContext='" + taskExecutionContext + '\'' +
                '}';
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  execute task request command
 */
pub
 lic class TaskExecuteRequestCommand implements Serializable {

    /**
     *  task execution context
     */
    private String taskExecutionContext;

    public String getTaskExecutionContext() {
        return taskExecutionContext;
    }

    public void setTaskExecutionContext(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    public TaskExecuteRequestCommand() {
    }

    public TaskExecuteRequestCommand(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_EXECUTE_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskExecuteRequestCommand{" +
           
      "taskExecutionContext='" + taskExecutionContext + '\'' +
                '}';
    }
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
similarity index 87%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
index 707bf07..deb6f5d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task 
 response command
 */
public class ExecuteTaskResponseCommand implements Serializable {


    public ExecuteTaskResponseCommand() {
    }

    public ExecuteTaskResponseCommand(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    /**
     *  task instance id
     */
    private int taskInstanceId;

    /**
     *  status
     */
    private int status;


    /**
     *  end time
     */
    private Date endTime;


    /**
     * processId
     */
    private int processId;

    /**
     * appIds
     */
    private String appIds;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public Date getEndTime() {
        return endTime;
    }

    public void setEndTime(Date endTime) {
        t
 his.endTime = endTime;
    }

    public int getProcessId() {
        return processId;
    }

    public void setProcessId(int processId) {
        this.processId = processId;
    }

    public String getAppIds() {
        return appIds;
    }

    public void setAppIds(String appIds) {
        this.appIds = appIds;
    }

    /**
     * package response command
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_RESPONSE);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "ExecuteTaskResponseCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", status=" + status +
                ", endTime=" + endTime +
                ", processId=" + processId +
                ", appIds='" + appIds + '\'' +
                '}';
   
  }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;

/**
 *  execute task 
 response command
 */
public class TaskExecuteResponseCommand implements Serializable {


    public TaskExecuteResponseCommand() {
    }

    public TaskExecuteResponseCommand(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    /**
     *  task instance id
     */
    private int taskInstanceId;

    /**
     *  status
     */
    private int status;


    /**
     *  end time
     */
    private Date endTime;


    /**
     * processId
     */
    private int processId;

    /**
     * appIds
     */
    private String appIds;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public Date getEndTime() {
        return endTime;
    }

    public void setEndTime(Date endTime) {
        t
 his.endTime = endTime;
    }

    public int getProcessId() {
        return processId;
    }

    public void setProcessId(int processId) {
        this.processId = processId;
    }

    public String getAppIds() {
        return appIds;
    }

    public void setAppIds(String appIds) {
        this.appIds = appIds;
    }

    /**
     * package response command
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_EXECUTE_RESPONSE);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskExecuteResponseCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", status=" + status +
                ", endTime=" + endTime +
                ", processId=" + processId +
                ", appIds='" + appIds + '\'' +
                '}';
   
  }
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
similarity index 83%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
index b8cfd89..e5c756a 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  kill task request command
 */
public
  class KillTaskRequestCommand implements Serializable {

    /**
     *  task execution context
     */
    private String taskExecutionContext;

    public String getTaskExecutionContext() {
        return taskExecutionContext;
    }

    public void setTaskExecutionContext(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    public KillTaskRequestCommand() {
    }

    public KillTaskRequestCommand(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.KILL_TASK_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "KillTaskRequestCommand{" +
                "taskExecutio
 nContext='" + taskExecutionContext + '\'' +
                '}';
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  kill task request command
 */
public
  class TaskKillRequestCommand implements Serializable {

    /**
     *  task execution context
     */
    private String taskExecutionContext;

    public String getTaskExecutionContext() {
        return taskExecutionContext;
    }

    public void setTaskExecutionContext(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    public TaskKillRequestCommand() {
    }

    public TaskKillRequestCommand(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_KILL_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskKillRequestCommand{" +
                "taskExecutio
 nContext='" + taskExecutionContext + '\'' +
                '}';
    }
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
similarity index 93%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskResponseCommand.java
rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
index 515bd07..2ca2330 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;
import java.util.List;
 

/**
 *  kill task response command
 */
public class KillTaskResponseCommand implements Serializable {

    /**
     * taskInstanceId
     */
    private int taskInstanceId;

    /**
     * host
     */
    private String host;

    /**
     * status
     */
    private int status;


    /**
     * processId
     */
    private int processId;

    /**
     * other resource manager appId , for example : YARN etc
     */
    protected List<String> appIds;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public int getProcessId() {
        return processId;
    }

    public void setPr
 ocessId(int processId) {
        this.processId = processId;
    }

    public List<String> getAppIds() {
        return appIds;
    }

    public void setAppIds(List<String> appIds) {
        this.appIds = appIds;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.KILL_TASK_RESPONSE);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "KillTaskResponseCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", host='" + host + '\'' +
                ", status=" + status +
                ", processId=" + processId +
                ", appIds=" + appIds +
                '}';
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;
import java.util.Date;
import java.util.List;
 

/**
 *  kill task response command
 */
public class TaskKillResponseCommand implements Serializable {

    /**
     * taskInstanceId
     */
    private int taskInstanceId;

    /**
     * host
     */
    private String host;

    /**
     * status
     */
    private int status;


    /**
     * processId
     */
    private int processId;

    /**
     * other resource manager appId , for example : YARN etc
     */
    protected List<String> appIds;


    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public int getProcessId() {
        return processId;
    }

    public void setPr
 ocessId(int processId) {
        this.processId = processId;
    }

    public List<String> getAppIds() {
        return appIds;
    }

    public void setAppIds(List<String> appIds) {
        this.appIds = appIds;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.TASK_KILL_RESPONSE);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "TaskKillResponseCommand{" +
                "taskInstanceId=" + taskInstanceId +
                ", host='" + host + '\'' +
                ", status=" + status +
                ", processId=" + processId +
                ", appIds=" + appIds +
                '}';
    }
}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 4c0c3e8..9c46ad6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -126,9 +126,9 @@ public class MasterServer implements IStoppable {
         NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(45678);
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
-        this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
-        this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor());
         this.nettyRemotingServer.start();
 
         //
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
index a62ee49..031d8b2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
@@ -18,8 +18,8 @@
 package org.apache.dolphinscheduler.server.master.cache;
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 
 /**
@@ -47,14 +47,14 @@ public interface TaskInstanceCacheManager {
      *
      * @param taskAckCommand taskAckCommand
      */
-    void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand);
+    void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand);
 
     /**
      * cache taskInstance
      *
-     * @param executeTaskResponseCommand executeTaskResponseCommand
+     * @param taskExecuteResponseCommand taskExecuteResponseCommand
      */
-    void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand);
+    void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand);
 
     /**
      * remove taskInstance by taskInstanceId
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
index dc775d8..c149ac3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
@@ -18,8 +18,8 @@ package org.apache.dolphinscheduler.server.master.cache.impl;
 
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -86,7 +86,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
      * @param taskAckCommand taskAckCommand
      */
     @Override
-    public void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand) {
+    public void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand) {
         TaskInstance taskInstance = new TaskInstance();
         taskInstance.setState(ExecutionStatus.of(taskAckCommand.getStatus()));
         taskInstance.setStartTime(taskAckCommand.getStartTime());
@@ -99,13 +99,13 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
     /**
      * cache taskInstance
      *
-     * @param executeTaskResponseCommand executeTaskResponseCommand
+     * @param taskExecuteResponseCommand taskExecuteResponseCommand
      */
     @Override
-    public void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand) {
-        TaskInstance taskInstance = getByTaskInstanceId(executeTaskResponseCommand.getTaskInstanceId());
-        taskInstance.setState(ExecutionStatus.of(executeTaskResponseCommand.getStatus()));
-        taskInstance.setEndTime(executeTaskResponseCommand.getEndTime());
+    public void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand) {
+        TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId());
+        taskInstance.setState(ExecutionStatus.of(taskExecuteResponseCommand.getStatus()));
+        taskInstance.setEndTime(taskExecuteResponseCommand.getEndTime());
     }
 
     /**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
index f1707df..c8cd9d1 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
@@ -42,6 +42,14 @@ public interface ExecutorManager<T> {
     T execute(ExecutionContext context) throws ExecuteException;
 
     /**
+     * execute task directly without retry
+     * @param context context
+     * @return T
+     * @throws ExecuteException
+     */
+    void executeDirectly(ExecutionContext context) throws ExecuteException;
+
+    /**
      *  after execute
      * @param context context
      * @throws ExecuteException
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 544a958..286b0e6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -21,7 +21,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.remote.utils.Host;
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
 import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
 import org.slf4j.Logger;
@@ -68,8 +69,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
          * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
          * register EXECUTE_TASK_ACK command type TaskAckProcessor
          */
-        this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
-        this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
+        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
+        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
+        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor());
     }
 
 
@@ -128,13 +130,19 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
         return success;
     }
 
+    public void executeDirectly(ExecutionContext context) throws ExecuteException {
+        Command command = buildCommand(context);
+        Host host = context.getHost();
+        doExecute(host,command);
+    }
+
     /**
      *  build command
      * @param context context
      * @return command
      */
     private Command buildCommand(ExecutionContext context) {
-        ExecuteTaskRequestCommand requestCommand = new ExecuteTaskRequestCommand();
+        TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
         ExecutorType executorType = context.getExecutorType();
         switch (executorType){
             case WORKER:
@@ -203,4 +211,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
         }
         return nodes;
     }
+
+    public NettyRemotingClient getNettyRemotingClient() {
+        return nettyRemotingClient;
+    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java
deleted file mode 100644
index 54d0022..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.dispatch.executor;
-
-import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
-import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
-import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
-import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
-import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- *  netty executor manager
- */
-@Service
-public class NettyKillManager extends AbstractExecutorManager<Boolean>{
-
-    private final Logger logger = LoggerFactory.getLogger(NettyKillManager.class);
-    /**
-     * netty remote client
-     */
-    private final NettyRemotingClient nettyRemotingClient;
-
-    public NettyKillManager(){
-        final NettyClientConfig clientConfig = new NettyClientConfig();
-        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
-        /**
-         * register KILL_TASK_RESPONSE command type TaskKillResponseProcessor
-         */
-        this.nettyRemotingClient.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
-    }
-
-    /**
-     * execute logic
-     *
-     * @param context context
-     * @return result
-     * @throws ExecuteException
-     */
-    @Override
-    public Boolean execute(ExecutionContext context) throws ExecuteException {
-        Host host = context.getHost();
-        Command command = buildCommand(context);
-        try {
-            doExecute(host, command);
-            return true;
-        }catch (ExecuteException ex) {
-            logger.error(String.format("execute context : %s error", context.getContext()), ex);
-            return false;
-        }
-    }
-
-
-    private Command buildCommand(ExecutionContext context) {
-        KillTaskRequestCommand requestCommand = new KillTaskRequestCommand();
-        TaskExecutionContext taskExecutionContext = context.getContext();
-
-        requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
-        return requestCommand.convert2Command();
-    }
-
-    /**
-     *  execute logic
-     * @param host host
-     * @param command command
-     * @throws ExecuteException
-     */
-    private void doExecute(final Host host, final Command command) throws ExecuteException {
-        /**
-         * retry count,default retry 3
-         */
-        int retryCount = 3;
-        boolean success = false;
-        do {
-            try {
-                nettyRemotingClient.send(host, command);
-                success = true;
-            } catch (Exception ex) {
-                logger.error(String.format("send command : %s to %s error", command, host), ex);
-                retryCount--;
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException ignore) {}
-            }
-        } while (retryCount >= 0 && !success);
-
-        if (!success) {
-            throw new ExecuteException(String.format("send command : %s to %s error", command, host));
-        }
-    }
-}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index cf38579..ef2cb67 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
@@ -57,12 +57,12 @@ public class TaskAckProcessor implements NettyRequestProcessor {
     /**
      *  task ack process
      * @param channel channel channel
-     * @param command command ExecuteTaskAckCommand
+     * @param command command TaskExecuteAckCommand
      */
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
-        ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
+        Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
+        TaskExecuteAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteAckCommand.class);
         logger.info("taskAckCommand : {}", taskAckCommand);
 
         taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index d6c3f69..4986e89 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -18,19 +18,12 @@
 package org.apache.dolphinscheduler.server.master.processor;
 
 import io.netty.channel.Channel;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
-import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
-import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,9 +43,9 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
      */
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.KILL_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
+        Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
 
-        KillTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskResponseCommand.class);
+        TaskKillResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class);
         logger.info("received command : {}", responseCommand);
         logger.info("已经接受到了worker杀任务的回应");
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index ed76153..93ca4ab 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
@@ -63,9 +63,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
      */
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
+        Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
 
-        ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class);
+        TaskExecuteResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteResponseCommand.class);
         logger.info("received command : {}", responseCommand);
 
         taskInstanceCacheManager.cacheTaskInstance(responseCommand);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 07f9168..995a4e8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
-import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyKillManager;
+import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,7 +57,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
     private TaskInstanceCacheManager taskInstanceCacheManager;
 
 
-    private NettyKillManager nettyKillManager;
+    private NettyExecutorManager nettyExecutorManager;
 
     /**
      * constructor of MasterTaskExecThread
@@ -67,7 +67,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
     public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
         super(taskInstance, processInstance);
         this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
-        this.nettyKillManager = SpringApplicationContext.getBean(NettyKillManager.class);
+        this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
     }
 
     /**
@@ -191,7 +191,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
         host.setPort(12346);
         executionContext.setHost(host);
 
-        nettyKillManager.execute(executionContext);
+        nettyExecutorManager.executeDirectly(executionContext);
 
         logger.info("master add kill task :{} id:{} to kill queue",
                 taskInstance.getName(), taskInstance.getId() );
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index ec43dd8..cfb94b5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -127,8 +127,8 @@ public class WorkerServer implements IStoppable {
         //init remoting server
         NettyServerConfig serverConfig = new NettyServerConfig();
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new TaskExecuteProcessor());
-        this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor());
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
+        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
         this.nettyRemotingServer.start();
 
         this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/KillTaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/KillTaskCallbackService.java
deleted file mode 100644
index 65342bd..0000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/KillTaskCallbackService.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.processor;
-
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- *  taks callback service
- */
-public class KillTaskCallbackService {
-
-    private final Logger logger = LoggerFactory.getLogger(KillTaskCallbackService.class);
-
-    /**
-     *  remote channels
-     */
-    private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();
-
-    /**
-     * netty remoting client
-     */
-    private final NettyRemotingClient nettyRemotingClient;
-
-
-    public KillTaskCallbackService(){
-        final NettyClientConfig clientConfig = new NettyClientConfig();
-        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
-    }
-
-    /**
-     *  add callback channel
-     * @param taskInstanceId taskInstanceId
-     * @param channel  channel
-     */
-    public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel){
-        REMOTE_CHANNELS.put(taskInstanceId, channel);
-    }
-
-    /**
-     *  get callback channel
-     * @param taskInstanceId taskInstanceId
-     * @return callback channel
-     */
-    public NettyRemoteChannel getRemoteChannel(int taskInstanceId){
-        NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
-        if(nettyRemoteChannel.isActive()){
-            return nettyRemoteChannel;
-        }
-        Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
-        if(newChannel != null){
-            NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, nettyRemoteChannel.getOpaque());
-            addRemoteChannel(taskInstanceId, remoteChannel);
-            return remoteChannel;
-        }
-        return null;
-    }
-
-    /**
-     *  remove callback channels
-     * @param taskInstanceId taskInstanceId
-     */
-    public void remove(int taskInstanceId){
-        REMOTE_CHANNELS.remove(taskInstanceId);
-    }
-
-    /**
-     *  send result
-     *
-     * @param taskInstanceId taskInstanceId
-     * @param killTaskResponseCommand killTaskResponseCommand
-     */
-    public void sendKillResult(int taskInstanceId, KillTaskResponseCommand killTaskResponseCommand){
-        NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
-        if(nettyRemoteChannel == null){
-            //TODO
-        } else{
-            nettyRemoteChannel.writeAndFlush(killTaskResponseCommand.convert2Command()).addListener(new ChannelFutureListener(){
-
-                @Override
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    if(future.isSuccess()){
-                        remove(taskInstanceId);
-                        return;
-                    }
-                }
-            });
-        }
-    }
-}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index 23ac7e2..02d889b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -22,17 +22,18 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
 
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
  *  taks callback service
  */
+@Service
 public class TaskCallbackService {
 
     private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
@@ -92,14 +93,14 @@ public class TaskCallbackService {
     /**
      *  send ack
      * @param taskInstanceId taskInstanceId
-     * @param ackCommand ackCommand
+     * @param command command
      */
-    public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
+    public void sendAck(int taskInstanceId, Command command){
         NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
         if(nettyRemoteChannel == null){
             //TODO
         } else{
-            nettyRemoteChannel.writeAndFlush(ackCommand.convert2Command());
+            nettyRemoteChannel.writeAndFlush(command);
         }
     }
 
@@ -107,14 +108,14 @@ public class TaskCallbackService {
      *  send result
      *
      * @param taskInstanceId taskInstanceId
-     * @param responseCommand responseCommand
+     * @param command command
      */
-    public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
+    public void sendResult(int taskInstanceId, Command command){
         NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
         if(nettyRemoteChannel == null){
             //TODO
         } else{
-            nettyRemoteChannel.writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
+            nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener(){
 
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 98e4e92..b53ef17 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
@@ -69,18 +69,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
     private final TaskCallbackService taskCallbackService;
 
     public TaskExecuteProcessor(){
-        this.taskCallbackService = new TaskCallbackService();
+        this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
         this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
         this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
     }
 
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(),
+        Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
                 String.format("invalid command type : %s", command.getType()));
         logger.info("received command : {}", command);
-        ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
-                command.getBody(), ExecuteTaskRequestCommand.class);
+        TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
+                command.getBody(), TaskExecuteRequestCommand.class);
 
         String contextJson = taskRequestCommand.getTaskExecutionContext();
 
@@ -105,8 +105,8 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
 
     private void doAck(TaskExecutionContext taskExecutionContext){
         // tell master that task is in executing
-        ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext);
-        taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
+        TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
+        taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
     }
 
     /**
@@ -134,10 +134,10 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
     /**
      * build ack command
      * @param taskExecutionContext taskExecutionContext
-     * @return ExecuteTaskAckCommand
+     * @return TaskExecuteAckCommand
      */
-    private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
-        ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
+    private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
+        TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
         ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
         ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
         ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index c910aed..c23f199 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
-import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
@@ -58,7 +58,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
     /**
      *  task callback service
      */
-    private final KillTaskCallbackService killTaskCallbackService;
+    private final TaskCallbackService taskCallbackService;
 
     /**
      * taskExecutionContextCacheManager
@@ -72,7 +72,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
 
 
     public TaskKillProcessor(){
-        this.killTaskCallbackService = new KillTaskCallbackService();
+        this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
         this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
         this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
     }
@@ -115,41 +115,41 @@ public class TaskKillProcessor implements NettyRequestProcessor {
 
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
-        KillTaskRequestCommand killTaskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class);
-        logger.info("received command : {}", killTaskRequestCommand);
+        Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
+        TaskKillRequestCommand taskKillRequestCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);
+        logger.info("received command : {}", taskKillRequestCommand);
 
 
-        String contextJson = killTaskRequestCommand.getTaskExecutionContext();
+        String contextJson = taskKillRequestCommand.getTaskExecutionContext();
 
         TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
 
         Boolean killStatus = doKill(taskExecutionContext);
 
-        killTaskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
+        taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
                 new NettyRemoteChannel(channel, command.getOpaque()));
 
-        KillTaskResponseCommand killTaskResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
-        killTaskCallbackService.sendKillResult(killTaskResponseCommand.getTaskInstanceId(),killTaskResponseCommand);
+        TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
+        taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
     }
 
     /**
-     * build KillTaskResponseCommand
+     * build TaskKillResponseCommand
      *
      * @param taskExecutionContext taskExecutionContext
      * @param killStatus killStatus
-     * @return build KillTaskResponseCommand
+     * @return build TaskKillResponseCommand
      */
-    private KillTaskResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
+    private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
                                                                  Boolean killStatus) {
-        KillTaskResponseCommand killTaskResponseCommand = new KillTaskResponseCommand();
-        killTaskResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        killTaskResponseCommand.setHost(taskExecutionContext.getHost());
-        killTaskResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
-        killTaskResponseCommand.setProcessId(taskExecutionContext.getProcessId());
-        killTaskResponseCommand.setAppIds(appIds);
-
-        return killTaskResponseCommand;
+        TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
+        taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        taskKillResponseCommand.setHost(taskExecutionContext.getHost());
+        taskKillResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
+        taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
+        taskKillResponseCommand.setAppIds(appIds);
+
+        return taskKillResponseCommand;
     }
 
     /**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index d3161ec..347dfb6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
@@ -75,7 +75,7 @@ public class TaskExecuteThread implements Runnable {
     @Override
     public void run() {
 
-        ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId());
+        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
         try {
             logger.info("script path : {}", taskExecutionContext.getExecutePath());
             // task node
@@ -134,7 +134,7 @@ public class TaskExecuteThread implements Runnable {
             responseCommand.setProcessId(task.getProcessId());
             responseCommand.setAppIds(task.getAppIds());
         } finally {
-            taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
+            taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
         }
     }