You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/03/02 13:30:16 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: master add kill task logic (#2058)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new bb3885c  master add kill task logic (#2058)
bb3885c is described below

commit bb3885cfe29a0f916ae1468f6403d80551ab91f6
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Mon Mar 2 21:30:04 2020 +0800

    master add kill task logic (#2058)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
---
 .../dolphinscheduler/common/enums/DbType.java      |  10 ++
 .../remote/command/KillTaskRequestCommand.java     |   2 +-
 .../builder/TaskExecutionContextBuilder.java       |   7 +-
 .../server/entity/DataxTaskExecutionContext.java   | 116 ++++++++++++++++++++
 .../server/entity/SQLTaskExecutionContext.java     |  64 +++++++++++
 .../server}/entity/TaskExecutionContext.java       |  37 ++++++-
 .../server/master/MasterServer.java                |   2 +
 .../master/cache/TaskInstanceCacheManager.java     |   2 +-
 .../cache/impl/TaskInstanceCacheManagerImpl.java   |   2 +-
 .../server/master/dispatch/ExecutorDispatcher.java |   4 +-
 .../master/dispatch/context/ExecutionContext.java  |   2 +-
 .../dispatch/executor/AbstractExecutorManager.java |   1 +
 .../master/dispatch/executor/ExecutorManager.java  |   4 +-
 .../dispatch/executor/NettyExecutorManager.java    |   6 +-
 .../master/dispatch/executor/NettyKillManager.java | 119 +++++++++++++++++++++
 .../host/assign/LowerWeightRoundRobin.java         |   6 ++
 .../server/master/future/TaskFuture.java           |   2 +-
 ...ocessor.java => TaskKillResponseProcessor.java} |  32 ++----
 .../master/processor/TaskResponseProcessor.java    |   2 +
 .../server/master/registry/MasterRegistry.java     |   3 +-
 .../master/runner/MasterBaseTaskExecThread.java    |   4 +-
 .../server/master/runner/MasterExecThread.java     |   5 +-
 .../server/master/runner/MasterTaskExecThread.java |  42 +++++---
 .../dolphinscheduler/server/monitor/Monitor.java   |   5 +
 .../server/registry/ZookeeperNodeManager.java      |   6 +-
 .../server/registry/ZookeeperRegistryCenter.java   |   6 +-
 .../dolphinscheduler/server/utils/ParamUtils.java  |   3 +-
 .../server/utils/ProcessUtils.java                 |   3 +-
 .../cache/TaskExecutionContextCacheManager.java    |   3 +-
 .../impl/TaskExecutionContextCacheManagerImpl.java |   2 +-
 .../worker/processor/TaskExecuteProcessor.java     |   3 +-
 .../server/worker/processor/TaskKillProcessor.java |  49 +++++----
 .../server/worker/registry/WorkerRegistry.java     |   4 +-
 .../server/worker/runner/TaskExecuteThread.java    |   2 +-
 .../worker/task/AbstractCommandExecutor.java       |  23 ++--
 .../server/worker/task/AbstractTask.java           |   2 +-
 .../server/worker/task/AbstractYarnTask.java       |   2 +-
 .../server/worker/task/PythonCommandExecutor.java  |   2 +-
 .../server/worker/task/ShellCommandExecutor.java   |   3 +-
 .../server/worker/task/TaskManager.java            |   2 +-
 .../server/worker/task/TaskProps.java              |  28 ++---
 .../server/worker/task/datax/DataxTask.java        |  60 +++++------
 .../server/worker/task/flink/FlinkTask.java        |   4 +-
 .../server/worker/task/http/HttpTask.java          |  12 +--
 .../server/worker/task/mr/MapReduceTask.java       |   2 +-
 .../worker/task/processdure/ProcedureTask.java     |   3 +-
 .../server/worker/task/python/PythonTask.java      |   5 +-
 .../server/worker/task/shell/ShellTask.java        |  17 ++-
 .../server/worker/task/spark/SparkTask.java        |   3 +-
 .../server/worker/task/sql/SqlTask.java            |  65 +++--------
 .../dolphinscheduler/server/zk/ZKMasterClient.java |   6 +-
 .../dolphinscheduler/server/zk/ZKWorkerClient.java |   2 +
 .../service/process/ProcessService.java            |   4 +
 53 files changed, 556 insertions(+), 249 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java
index 5fb245a..cc3a295 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java
@@ -57,4 +57,14 @@ public enum DbType {
     public String getDescp() {
         return descp;
     }
+
+
+    public static DbType of(int type){
+        for(DbType ty : values()){
+            if(ty.getCode() == type){
+                return ty;
+            }
+        }
+        throw new IllegalArgumentException("invalid type : " + type);
+    }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
index 2e87540..b8cfd89 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  kill task request command
 */
public
  class KillTaskRequestCommand implements Serializable {

    /**
     *  taskInstanceId
     */
    private int taskInstanceId;

    /**
     * processId
     */
    private int processId;

    /**
     * host
     */
    private String host;

    /**
     * tenantCode
     */
    private String tenantCode;

    /**
     * logPath
     */
    private String logPath;

    /**
     * executePath
     */
    private String executePath;

    public String getLogPath() {
        return logPath;
    }

    public void setLogPath(String logPath) {
        this.logPath = logPath;
    }

    public int getTaskInstanceId() {
        return taskInstanceId;
    }

    public void setTaskInstanceId(int taskInstanceId) {
        this.taskInstanceId = taskInstanceId;
    }

    public int getProcessId() {
        return processId;
    }

    public void setProcessId(int processId) {
        this.processId = processId;
    }

    public String getHost() {
        return host;
    }

    public void
  setHost(String host) {
        this.host = host;
    }

    public String getTenantCode() {
        return tenantCode;
    }

    public void setTenantCode(String tenantCode) {
        this.tenantCode = tenantCode;
    }

    public String getExecutePath() {
        return executePath;
    }

    public void setExecutePath(String executePath) {
        this.executePath = executePath;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.KILL_TASK_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }


}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  kill task request command
 */
public
  class KillTaskRequestCommand implements Serializable {

    /**
     *  task execution context
     */
    private String taskExecutionContext;

    public String getTaskExecutionContext() {
        return taskExecutionContext;
    }

    public void setTaskExecutionContext(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    public KillTaskRequestCommand() {
    }

    public KillTaskRequestCommand(String taskExecutionContext) {
        this.taskExecutionContext = taskExecutionContext;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.KILL_TASK_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "KillTaskRequestCommand{" +
                "taskExecutio
 nContext='" + taskExecutionContext + '\'' +
                '}';
    }
}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 8cdd13e..1388e79 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -17,10 +17,11 @@
 
 package org.apache.dolphinscheduler.server.builder;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 
 /**
  *  TaskExecutionContext builder
@@ -47,6 +48,8 @@ public class TaskExecutionContextBuilder {
         taskExecutionContext.setLogPath(taskInstance.getLogPath());
         taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
         taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
+        taskExecutionContext.setHost(taskInstance.getHost());
+        taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
         return this;
     }
 
@@ -54,7 +57,7 @@ public class TaskExecutionContextBuilder {
     /**
      * build processInstance related info
      *
-     * @param processInstance
+     * @param processInstance processInstance
      * @return TaskExecutionContextBuilder
      */
     public TaskExecutionContextBuilder buildProcessInstanceRelatedInfo(ProcessInstance processInstance){
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java
new file mode 100644
index 0000000..dd8d646
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.entity;
+
+import java.io.Serializable;
+
+/**
+ *  master/worker task transport
+ */
+public class DataxTaskExecutionContext implements Serializable{
+
+    /**
+     * dataSourceId
+     */
+    private int dataSourceId;
+
+    /**
+     * sourcetype
+     */
+    private int sourcetype;
+
+    /**
+     * sourceConnectionParams
+     */
+    private String sourceConnectionParams;
+
+    /**
+     * dataTargetId
+     */
+    private int dataTargetId;
+
+    /**
+     * targetType
+     */
+    private int targetType;
+
+    /**
+     * targetConnectionParams
+     */
+    private String targetConnectionParams;
+
+    public int getDataSourceId() {
+        return dataSourceId;
+    }
+
+    public void setDataSourceId(int dataSourceId) {
+        this.dataSourceId = dataSourceId;
+    }
+
+    public int getSourcetype() {
+        return sourcetype;
+    }
+
+    public void setSourcetype(int sourcetype) {
+        this.sourcetype = sourcetype;
+    }
+
+    public String getSourceConnectionParams() {
+        return sourceConnectionParams;
+    }
+
+    public void setSourceConnectionParams(String sourceConnectionParams) {
+        this.sourceConnectionParams = sourceConnectionParams;
+    }
+
+    public int getDataTargetId() {
+        return dataTargetId;
+    }
+
+    public void setDataTargetId(int dataTargetId) {
+        this.dataTargetId = dataTargetId;
+    }
+
+    public int getTargetType() {
+        return targetType;
+    }
+
+    public void setTargetType(int targetType) {
+        this.targetType = targetType;
+    }
+
+    public String getTargetConnectionParams() {
+        return targetConnectionParams;
+    }
+
+    public void setTargetConnectionParams(String targetConnectionParams) {
+        this.targetConnectionParams = targetConnectionParams;
+    }
+
+    @Override
+    public String toString() {
+        return "DataxTaskExecutionContext{" +
+                "dataSourceId=" + dataSourceId +
+                ", sourcetype=" + sourcetype +
+                ", sourceConnectionParams='" + sourceConnectionParams + '\'' +
+                ", dataTargetId=" + dataTargetId +
+                ", targetType=" + targetType +
+                ", targetConnectionParams='" + targetConnectionParams + '\'' +
+                '}';
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
new file mode 100644
index 0000000..b1ec20d
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.entity;
+
+import org.apache.dolphinscheduler.dao.entity.UdfFunc;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ *  SQL Task ExecutionContext
+ */
+public class SQLTaskExecutionContext implements Serializable {
+
+
+    /**
+     * warningGroupId
+     */
+    private int warningGroupId;
+    /**
+     * udf function list
+     */
+    private List<UdfFunc> udfFuncList;
+
+
+    public int getWarningGroupId() {
+        return warningGroupId;
+    }
+
+    public void setWarningGroupId(int warningGroupId) {
+        this.warningGroupId = warningGroupId;
+    }
+
+    public List<UdfFunc> getUdfFuncList() {
+        return udfFuncList;
+    }
+
+    public void setUdfFuncList(List<UdfFunc> udfFuncList) {
+        this.udfFuncList = udfFuncList;
+    }
+
+    @Override
+    public String toString() {
+        return "SQLTaskExecutionContext{" +
+                "warningGroupId=" + warningGroupId +
+                ", udfFuncList=" + udfFuncList +
+                '}';
+    }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
similarity index 89%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 3ed71e5..fb3aab9 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -15,12 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.remote.entity;
+package org.apache.dolphinscheduler.server.entity;
 
 import java.io.Serializable;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -144,7 +142,6 @@ public class TaskExecutionContext implements Serializable{
      */
     private Map<String, String> definedParams;
 
-
     /**
      * task AppId
      */
@@ -165,6 +162,20 @@ public class TaskExecutionContext implements Serializable{
      */
     private String workerGroup;
 
+
+    /**
+     *  sql TaskExecutionContext
+     */
+    private SQLTaskExecutionContext sqlTaskExecutionContext;
+
+    /**
+     *  datax TaskExecutionContext
+     */
+    private DataxTaskExecutionContext dataxTaskExecutionContext;
+
+
+
+
     public String getWorkerGroup() {
         return workerGroup;
     }
@@ -373,6 +384,21 @@ public class TaskExecutionContext implements Serializable{
         this.appIds = appIds;
     }
 
+    public SQLTaskExecutionContext getSqlTaskExecutionContext() {
+        return sqlTaskExecutionContext;
+    }
+
+    public void setSqlTaskExecutionContext(SQLTaskExecutionContext sqlTaskExecutionContext) {
+        this.sqlTaskExecutionContext = sqlTaskExecutionContext;
+    }
+
+    public DataxTaskExecutionContext getDataxTaskExecutionContext() {
+        return dataxTaskExecutionContext;
+    }
+
+    public void setDataxTaskExecutionContext(DataxTaskExecutionContext dataxTaskExecutionContext) {
+        this.dataxTaskExecutionContext = dataxTaskExecutionContext;
+    }
 
     @Override
     public String toString() {
@@ -402,6 +428,9 @@ public class TaskExecutionContext implements Serializable{
                 ", taskAppId='" + taskAppId + '\'' +
                 ", taskTimeoutStrategy=" + taskTimeoutStrategy +
                 ", taskTimeout=" + taskTimeout +
+                ", workerGroup='" + workerGroup + '\'' +
+                ", sqlTaskExecutionContext=" + sqlTaskExecutionContext +
+                ", dataxTaskExecutionContext=" + dataxTaskExecutionContext +
                 '}';
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 7c33b90..4c0c3e8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
@@ -127,6 +128,7 @@ public class MasterServer implements IStoppable {
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
         this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
+        this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
         this.nettyRemotingServer.start();
 
         //
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
index 98d2a24..a62ee49 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master.cache;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 
 /**
  *  task instance state manager
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
index 6624eeb..dc775d8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
@@ -20,7 +20,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.springframework.beans.factory.annotation.Autowired;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index c597dc1..df563a6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -67,8 +67,10 @@ public class ExecutorDispatcher implements InitializingBean {
     }
 
     /**
-     *  task dispatch
+     * task dispatch
+     *
      * @param context context
+     * @return result
      * @throws ExecuteException
      */
     public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index 5157dd2..19124d3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -17,8 +17,8 @@
 package org.apache.dolphinscheduler.server.master.dispatch.context;
 
 
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 
 /**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
index c0be5a8..9e4c222 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
@@ -27,6 +27,7 @@ public abstract class AbstractExecutorManager<T> implements ExecutorManager<T>{
 
     /**
      * before execute , add time monitor , timeout
+     *
      * @param context context
      * @throws ExecuteException
      */
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
index 9b0b9af..f1707df 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
@@ -27,6 +27,7 @@ public interface ExecutorManager<T> {
 
     /**
      * before execute
+     *
      * @param executeContext executeContext
      * @throws ExecuteException
      */
@@ -35,12 +36,13 @@ public interface ExecutorManager<T> {
     /**
      * execute task
      * @param context context
+     * @return T
      * @throws ExecuteException
      */
     T execute(ExecutionContext context) throws ExecuteException;
 
     /**
-     * after execute
+     *  after execute
      * @param context context
      * @throws ExecuteException
      */
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index f4b1dab..544a958 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -23,9 +23,9 @@ import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
@@ -72,9 +72,11 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
         this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
     }
 
+
     /**
-     *  execute logic
+     * execute logic
      * @param context context
+     * @return result
      * @throws ExecuteException
      */
     @Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java
new file mode 100644
index 0000000..54d0022
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.executor;
+
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *  netty executor manager
+ */
+@Service
+public class NettyKillManager extends AbstractExecutorManager<Boolean>{
+
+    private final Logger logger = LoggerFactory.getLogger(NettyKillManager.class);
+    /**
+     * netty remote client
+     */
+    private final NettyRemotingClient nettyRemotingClient;
+
+    public NettyKillManager(){
+        final NettyClientConfig clientConfig = new NettyClientConfig();
+        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+        /**
+         * register KILL_TASK_RESPONSE command type TaskKillResponseProcessor
+         */
+        this.nettyRemotingClient.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
+    }
+
+    /**
+     * execute logic
+     *
+     * @param context context
+     * @return result
+     * @throws ExecuteException
+     */
+    @Override
+    public Boolean execute(ExecutionContext context) throws ExecuteException {
+        Host host = context.getHost();
+        Command command = buildCommand(context);
+        try {
+            doExecute(host, command);
+            return true;
+        }catch (ExecuteException ex) {
+            logger.error(String.format("execute context : %s error", context.getContext()), ex);
+            return false;
+        }
+    }
+
+
+    private Command buildCommand(ExecutionContext context) {
+        KillTaskRequestCommand requestCommand = new KillTaskRequestCommand();
+        TaskExecutionContext taskExecutionContext = context.getContext();
+
+        requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
+        return requestCommand.convert2Command();
+    }
+
+    /**
+     *  execute logic
+     * @param host host
+     * @param command command
+     * @throws ExecuteException
+     */
+    private void doExecute(final Host host, final Command command) throws ExecuteException {
+        /**
+         * retry count,default retry 3
+         */
+        int retryCount = 3;
+        boolean success = false;
+        do {
+            try {
+                nettyRemotingClient.send(host, command);
+                success = true;
+            } catch (Exception ex) {
+                logger.error(String.format("send command : %s to %s error", command, host), ex);
+                retryCount--;
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ignore) {}
+            }
+        } while (retryCount >= 0 && !success);
+
+        if (!success) {
+            throw new ExecuteException(String.format("send command : %s to %s error", command, host));
+        }
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
index cadf418..bdf0f41 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
@@ -24,6 +24,12 @@ import java.util.Collection;
  */
 public class LowerWeightRoundRobin implements Selector<HostWeight>{
 
+    /**
+     * select
+     * @param sources sources
+     * @return HostWeight
+     */
+    @Override
     public HostWeight select(Collection<HostWeight> sources){
         int totalWeight = 0;
         int lowWeight = 0;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
index 0c6d740..d22c6f2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
@@ -68,7 +68,7 @@ public class TaskFuture {
     }
 
     /**
-     *  wait for response
+     * wait for response
      * @return command
      * @throws InterruptedException
      */
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
similarity index 64%
copy from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
copy to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index d6279c6..d6c3f69 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -23,36 +23,23 @@ import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  *  task response processor
  */
-public class TaskResponseProcessor implements NettyRequestProcessor {
+public class TaskKillResponseProcessor implements NettyRequestProcessor {
 
-    private final Logger logger = LoggerFactory.getLogger(TaskResponseProcessor.class);
-
-    /**
-     * process service
-     */
-    private final ProcessService processService;
-
-    /**
-     * taskInstance cache manager
-     */
-    private final TaskInstanceCacheManager taskInstanceCacheManager;
-
-    public TaskResponseProcessor(){
-        this.processService = SpringApplicationContext.getBean(ProcessService.class);
-        this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
-    }
+    private final Logger logger = LoggerFactory.getLogger(TaskKillResponseProcessor.class);
 
     /**
      * task final result response
@@ -63,16 +50,11 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
      */
     @Override
     public void process(Channel channel, Command command) {
-        Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
+        Preconditions.checkArgument(CommandType.KILL_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
 
-        ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class);
+        KillTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskResponseCommand.class);
         logger.info("received command : {}", responseCommand);
-
-        taskInstanceCacheManager.cacheTaskInstance(responseCommand);
-
-        processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
-                responseCommand.getEndTime(),
-                responseCommand.getTaskInstanceId());
+        logger.info("已经接受到了worker杀任务的回应");
     }
 
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index d6279c6..ed76153 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -72,6 +72,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
 
         processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
                 responseCommand.getEndTime(),
+                responseCommand.getProcessId(),
+                responseCommand.getAppIds(),
                 responseCommand.getTaskInstanceId());
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index ebfb2f4..1eb06b6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -67,9 +67,10 @@ public class MasterRegistry {
     private final String startTime;
 
     /**
-     *  construct
+     * construct
      * @param zookeeperRegistryCenter zookeeperRegistryCenter
      * @param port port
+     * @param heartBeatInterval heartBeatInterval
      */
     public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
         this.zookeeperRegistryCenter = zookeeperRegistryCenter;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index 7812dbf..9d40de9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@@ -147,7 +147,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
      * @param taskInstance taskInstance
      * @return TaskExecutionContext
      */
-    private TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance){
+    protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance){
         taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
 
         Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 576dc76..a05f8dc 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -142,8 +142,9 @@ public class MasterExecThread implements Runnable {
 
     /**
      * constructor of MasterExecThread
-     * @param processInstance   process instance
-     * @param processService        process dao
+     * @param processInstance processInstance
+     * @param processService processService
+     * @param nettyRemotingClient nettyRemotingClient
      */
     public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){
         this.processService = processService;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index feba5a2..07f9168 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -26,15 +26,19 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
+import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyKillManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Date;
 
-import static org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_KILL;
 
 /**
  * master task exec thread
@@ -52,6 +56,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
      */
     private TaskInstanceCacheManager taskInstanceCacheManager;
 
+
+    private NettyKillManager nettyKillManager;
+
     /**
      * constructor of MasterTaskExecThread
      * @param taskInstance      task instance
@@ -60,6 +67,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
     public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
         super(taskInstance, processInstance);
         this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
+        this.nettyKillManager = SpringApplicationContext.getBean(NettyKillManager.class);
     }
 
     /**
@@ -78,6 +86,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
 
     /**
      * TODO submit task instance and wait complete
+     *
      * @return true is task quit is true
      */
     @Override
@@ -99,14 +108,14 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
     }
 
     /**
-     * TODO 在这里轮询数据库
+     * TODO polling db
      *
      * wait task quit
      * @return true if task quit success
      */
     public Boolean waitTaskQuit(){
         // query new state
-        taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
+        taskInstance = processService.findTaskInstanceById(taskInstance.getId());
         logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
                 this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());
         // task time out
@@ -147,7 +156,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
                     }
                 }
                 // updateProcessInstance task instance
-                taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
+                taskInstance = processService.findTaskInstanceById(taskInstance.getId());
                 processInstance = processService.findProcessInstanceById(processInstance.getId());
                 Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (Exception e) {
@@ -163,23 +172,26 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
 
 
     /**
-     *  TODO Kill 任务
+     *  TODO Kill TASK
      *
      *  task instance add queue , waiting worker to kill
      */
-    private void cancelTaskInstance(){
+    private void cancelTaskInstance() throws Exception{
         if(alreadyKilled){
             return ;
         }
         alreadyKilled = true;
-        String host = taskInstance.getHost();
-        if(host == null){
-            host = Constants.NULL;
-        }
-        String queueValue = String.format("%s-%d",
-                host, taskInstance.getId());
-        // TODO 这里写
-        taskQueue.sadd(DOLPHINSCHEDULER_TASKS_KILL, queueValue);
+
+        TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance);
+
+        ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER);
+
+        Host host = new Host();
+        host.setIp(taskInstance.getHost());
+        host.setPort(12346);
+        executionContext.setHost(host);
+
+        nettyKillManager.execute(executionContext);
 
         logger.info("master add kill task :{} id:{} to kill queue",
                 taskInstance.getName(), taskInstance.getId() );
@@ -197,7 +209,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
 
 
     /**
-     * get remain time(s)
+     * get remain time?s?
      *
      * @return remain time
      */
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java
index 3ee9488..8d7bf0b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java
@@ -23,6 +23,11 @@ public interface Monitor {
 
     /**
      * monitor server and restart
+     *
+     * @param masterPath masterPath
+     * @param workerPath workerPath
+     * @param port port
+     * @param installPath installPath
      */
     void monitor(String masterPath, String workerPath, Integer port, String installPath);
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index 25355e2..9a4a7ca 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -74,7 +74,7 @@ public class ZookeeperNodeManager implements InitializingBean {
     private ZookeeperRegistryCenter registryCenter;
 
     /**
-     *  init listener
+     * init listener
      * @throws Exception
      */
     @Override
@@ -234,8 +234,8 @@ public class ZookeeperNodeManager implements InitializingBean {
 
     /**
      * get worker group nodes
-     * @param workerGroup
-     * @return
+     * @param workerGroup workerGroup
+     * @return worker nodes
      */
     public Set<String> getWorkerGroupNodes(String workerGroup){
         workerGroupLock.lock();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index a6a3ea0..b186a42 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -129,7 +129,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
 
     /**
      * get worker group directly
-     * @return
+     * @return worker group nodes
      */
     public Set<String> getWorkerGroupDirectly() {
         List<String> workers = getChildrenKeys(getWorkerPath());
@@ -166,8 +166,8 @@ public class ZookeeperRegistryCenter implements InitializingBean {
 
     /**
      * get worker group path
-     * @param workerGroup
-     * @return
+     * @param workerGroup workerGroup
+     * @return worker group path
      */
     public String getWorkerGroupPath(String workerGroup) {
         return WORKER_PATH + "/" + workerGroup;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
index 3aba546..063a7d7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
@@ -111,7 +111,8 @@ public class ParamUtils {
 
     /**
      * get parameters map
-     * @return user defined params map
+     * @param definedParams definedParams
+     * @return parameters map
      */
     public static Map<String,Property> getUserDefParamsMap(Map<String,String> definedParams) {
         if (definedParams != null) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index c6efbae..ee1f091 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -21,9 +21,8 @@ import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.commons.io.FileUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
index db78127..7df8e01 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
@@ -17,7 +17,8 @@
 
 package org.apache.dolphinscheduler.server.worker.cache;
 
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 
 /**
  *  TaskExecutionContextCacheManager
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
index 584c42b..009332f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.dolphinscheduler.server.worker.cache.impl;
 
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
 import org.springframework.stereotype.Service;
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 3c79e8c..98e4e92 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -33,13 +33,12 @@ import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 1ea7394..c910aed 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
+import com.alibaba.fastjson.JSONObject;
 import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -28,9 +29,9 @@ import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
 import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
@@ -75,32 +76,35 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
         this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
     }
+
     /**
      * kill task logic
      *
-     * @param killCommand killCommand
+     * @param context context
+     * @return execute result
      */
-    private Boolean doKill(KillTaskRequestCommand killCommand){
+    private Boolean doKill(TaskExecutionContext context){
         try {
-            TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
+            TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(context.getTaskInstanceId());
+            context.setProcessId(taskExecutionContext.getProcessId());
 
             Integer processId = taskExecutionContext.getProcessId();
 
             if (processId == null || processId.equals(0)){
-                logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId());
+                logger.error("process kill failed, process id :{}, task id:{}", processId, context.getTaskInstanceId());
                 return false;
             }
 
-            killCommand.setProcessId(processId);
 
-            String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(killCommand.getProcessId()));
+            String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(context.getProcessId()));
 
-            logger.info("process id:{}, cmd:{}", killCommand.getProcessId(), cmd);
+            logger.info("process id:{}, cmd:{}", context.getProcessId(), cmd);
 
             OSUtils.exeCmd(cmd);
 
+
             // find log and kill yarn job
-            killYarnJob(killCommand.getHost(), killCommand.getLogPath(), killCommand.getExecutePath(), killCommand.getTenantCode());
+            killYarnJob(context.getHost(), context.getLogPath(), context.getExecutePath(), context.getTenantCode());
 
             return true;
         } catch (Exception e) {
@@ -115,29 +119,37 @@ public class TaskKillProcessor implements NettyRequestProcessor {
         KillTaskRequestCommand killTaskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class);
         logger.info("received command : {}", killTaskRequestCommand);
 
-        Boolean killStatus = doKill(killTaskRequestCommand);
 
-        KillTaskResponseCommand killTaskResponseCommand = buildKillTaskResponseCommand(killTaskRequestCommand,killStatus);
+        String contextJson = killTaskRequestCommand.getTaskExecutionContext();
+
+        TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
+
+        Boolean killStatus = doKill(taskExecutionContext);
+
+        killTaskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
+                new NettyRemoteChannel(channel, command.getOpaque()));
+
+        KillTaskResponseCommand killTaskResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
         killTaskCallbackService.sendKillResult(killTaskResponseCommand.getTaskInstanceId(),killTaskResponseCommand);
     }
 
     /**
      * build KillTaskResponseCommand
      *
-     * @param killTaskRequestCommand killTaskRequestCommand
+     * @param taskExecutionContext taskExecutionContext
      * @param killStatus killStatus
-     * @return KillTaskResponseCommand
+     * @return build KillTaskResponseCommand
      */
-    private KillTaskResponseCommand buildKillTaskResponseCommand(KillTaskRequestCommand killTaskRequestCommand,
+    private KillTaskResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
                                                                  Boolean killStatus) {
         KillTaskResponseCommand killTaskResponseCommand = new KillTaskResponseCommand();
-        killTaskResponseCommand.setTaskInstanceId(killTaskRequestCommand.getTaskInstanceId());
-        killTaskResponseCommand.setHost(killTaskRequestCommand.getHost());
+        killTaskResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+        killTaskResponseCommand.setHost(taskExecutionContext.getHost());
         killTaskResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
-        killTaskResponseCommand.setProcessId(killTaskRequestCommand.getProcessId());
+        killTaskResponseCommand.setProcessId(taskExecutionContext.getProcessId());
         killTaskResponseCommand.setAppIds(appIds);
 
-        return null;
+        return killTaskResponseCommand;
     }
 
     /**
@@ -156,6 +168,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
             String log = null;
             try {
                 logClient = new LogClientService();
+                logger.info("view log host : {},logPath : {}", host,logPath);
                 log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
             } finally {
                 if(logClient != null){
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 977643c..a1d5524 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -76,9 +76,11 @@ public class WorkerRegistry {
     private String workerGroup;
 
     /**
-     *  construct
+     * construct
+     *
      * @param zookeeperRegistryCenter zookeeperRegistryCenter
      * @param port port
+     * @param heartBeatInterval heartBeatInterval
      */
     public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
         this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_WORKER_GROUP);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index d26feaf..d3161ec 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.TaskManager;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
index e573d3a..7727115 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
@@ -21,17 +21,13 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
 import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.slf4j.Logger;
 
@@ -67,11 +63,6 @@ public abstract class AbstractCommandExecutor {
     protected Consumer<List<String>> logHandler;
 
     /**
-     *  timeout
-     */
-    protected int timeout;
-
-    /**
      *  logger
      */
     protected Logger logger;
@@ -132,6 +123,7 @@ public abstract class AbstractCommandExecutor {
 
     /**
      * task specific execution logic
+     *
      * @param execCommand execCommand
      * @return CommandExecuteResult
      * @throws Exception
@@ -174,8 +166,6 @@ public abstract class AbstractCommandExecutor {
         // waiting for the run to finish
         boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
 
-        // SHELL task state
-        result.setExitStatusCode(process.exitValue());
 
         logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}",
                 taskExecutionContext.getExecutePath(),
@@ -195,6 +185,9 @@ public abstract class AbstractCommandExecutor {
             ProcessUtils.kill(taskExecutionContext);
             result.setExitStatusCode(EXIT_CODE_FAILURE);
         }
+
+        // SHELL task state
+        result.setExitStatusCode(process.exitValue());
         return result;
     }
 
@@ -378,7 +371,7 @@ public abstract class AbstractCommandExecutor {
 
         List<String> appIds = new ArrayList<>();
         /**
-         * analysis log,get submited yarn application id
+         * analysis log?get submited yarn application id
          */
         for (String log : logs) {
             String appId = findAppId(log);
@@ -440,13 +433,13 @@ public abstract class AbstractCommandExecutor {
 
 
     /**
-     * get remain time(s)
+     * get remain time?s?
      *
      * @return remain time
      */
     private long getRemaintime() {
         long usedTime = (System.currentTimeMillis() - taskExecutionContext.getStartTime().getTime()) / 1000;
-        long remainTime = timeout - usedTime;
+        long remainTime = taskExecutionContext.getTaskTimeout() - usedTime;
 
         if (remainTime < 0) {
             throw new RuntimeException("task execution time out");
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index e6dd973..86aed54 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
 import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.TaskRecordDao;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
index 2ce397a..62e35fd 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
@@ -17,7 +17,7 @@
 package org.apache.dolphinscheduler.server.worker.task;
 
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
index ad0671f..344d00f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
index 877c607..6b25cd3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
@@ -17,7 +17,7 @@
 package org.apache.dolphinscheduler.server.worker.task;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.slf4j.Logger;
 
 import java.io.File;
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.Date;
 import java.util.List;
 import java.util.function.Consumer;
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
index 468375c..1fef7e6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task;
 
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.utils.EnumUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask;
 import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask;
 import org.apache.dolphinscheduler.server.worker.task.http.HttpTask;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
index a7b66bb..483dd18 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
@@ -129,19 +129,21 @@ public class TaskProps {
 
   /**
    * constructor
-   * @param taskParams          task params
-   * @param taskDir             task dir
-   * @param scheduleTime        schedule time
-   * @param nodeName            node name
-   * @param taskType            task type
-   * @param taskInstanceId          task instance id
-   * @param envFile             env file
-   * @param tenantCode          tenant code
-   * @param queue               queue
-   * @param taskStartTime       task start time
-   * @param definedParams       defined params
-   * @param dependence          dependence
-   * @param cmdTypeIfComplement cmd type if complement
+   * @param taskParams taskParams
+   * @param scheduleTime scheduleTime
+   * @param nodeName nodeName
+   * @param taskType taskType
+   * @param taskInstanceId taskInstanceId
+   * @param envFile envFile
+   * @param tenantCode tenantCode
+   * @param queue queue
+   * @param taskStartTime taskStartTime
+   * @param definedParams definedParams
+   * @param dependence dependence
+   * @param cmdTypeIfComplement cmdTypeIfComplement
+   * @param host host
+   * @param logPath logPath
+   * @param executePath executePath
    */
   public TaskProps(String taskParams,
                    Date scheduleTime,
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index 24abe57..3e5aa51 100755
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -39,6 +39,7 @@ import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.DataType;
 import org.apache.dolphinscheduler.common.enums.DbType;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
@@ -50,13 +51,13 @@ import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
 import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
 import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.DataxUtils;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
@@ -99,11 +100,6 @@ public class DataxTask extends AbstractTask {
     private DataxParameters dataXParameters;
 
     /**
-     * task dir
-     */
-    private String taskDir;
-
-    /**
      * shell command executor
      */
     private ShellCommandExecutor shellCommandExecutor;
@@ -114,11 +110,6 @@ public class DataxTask extends AbstractTask {
     private TaskExecutionContext taskExecutionContext;
 
     /**
-     * processService
-     */
-    private ProcessService processService;
-
-    /**
      * constructor
      * @param taskExecutionContext taskExecutionContext
      * @param logger logger
@@ -127,13 +118,9 @@ public class DataxTask extends AbstractTask {
         super(taskExecutionContext, logger);
         this.taskExecutionContext = taskExecutionContext;
 
-        logger.info("task dir : {}", taskDir);
 
         this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
                 taskExecutionContext,logger);
-
-        processService = SpringApplicationContext.getBean(ProcessService.class);
-
     }
 
     /**
@@ -151,12 +138,11 @@ public class DataxTask extends AbstractTask {
 
     /**
      * run DataX process
-     * 
+     *
      * @throws Exception
      */
     @Override
-    public void handle()
-        throws Exception {
+    public void handle() throws Exception {
         try {
             // set the name of the current thread
             String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId());
@@ -180,8 +166,8 @@ public class DataxTask extends AbstractTask {
 
     /**
      * cancel DataX process
-     * 
-     * @param cancelApplication
+     *
+     * @param cancelApplication cancelApplication
      * @throws Exception
      */
     @Override
@@ -200,7 +186,9 @@ public class DataxTask extends AbstractTask {
     private String buildDataxJsonFile()
         throws Exception {
         // generate json
-        String fileName = String.format("%s/%s_job.json", taskDir, taskExecutionContext.getTaskAppId());
+        String fileName = String.format("%s/%s_job.json",
+                taskExecutionContext.getExecutePath(),
+                taskExecutionContext.getTaskAppId());
 
         Path path = new File(fileName).toPath();
         if (Files.exists(path)) {
@@ -230,13 +218,14 @@ public class DataxTask extends AbstractTask {
      */
     private List<JSONObject> buildDataxJobContentJson()
         throws SQLException {
-        DataSource dataSource = processService.findDataSourceById(dataXParameters.getDataSource());
-        BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(dataSource.getType(),
-            dataSource.getConnectionParams());
+        DataxTaskExecutionContext dataxTaskExecutionContext = taskExecutionContext.getDataxTaskExecutionContext();
 
-        DataSource dataTarget = processService.findDataSourceById(dataXParameters.getDataTarget());
-        BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(dataTarget.getType(),
-            dataTarget.getConnectionParams());
+
+        BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(DbType.of(dataxTaskExecutionContext.getSourcetype()),
+                dataxTaskExecutionContext.getSourceConnectionParams());
+
+        BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(DbType.of(dataxTaskExecutionContext.getTargetType()),
+                dataxTaskExecutionContext.getTargetConnectionParams());
 
         List<JSONObject> readerConnArr = new ArrayList<>();
         JSONObject readerConn = new JSONObject();
@@ -250,7 +239,7 @@ public class DataxTask extends AbstractTask {
         readerParam.put("connection", readerConnArr);
 
         JSONObject reader = new JSONObject();
-        reader.put("name", DataxUtils.getReaderPluginName(dataSource.getType()));
+        reader.put("name", DataxUtils.getReaderPluginName(DbType.of(dataxTaskExecutionContext.getSourcetype())));
         reader.put("parameter", readerParam);
 
         List<JSONObject> writerConnArr = new ArrayList<>();
@@ -263,7 +252,9 @@ public class DataxTask extends AbstractTask {
         writerParam.put("username", dataTargetCfg.getUser());
         writerParam.put("password", dataTargetCfg.getPassword());
         writerParam.put("column",
-            parsingSqlColumnNames(dataSource.getType(), dataTarget.getType(), dataSourceCfg, dataXParameters.getSql()));
+            parsingSqlColumnNames(DbType.of(dataxTaskExecutionContext.getSourcetype()),
+                    DbType.of(dataxTaskExecutionContext.getTargetType()),
+                    dataSourceCfg, dataXParameters.getSql()));
         writerParam.put("connection", writerConnArr);
 
         if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) {
@@ -275,7 +266,7 @@ public class DataxTask extends AbstractTask {
         }
 
         JSONObject writer = new JSONObject();
-        writer.put("name", DataxUtils.getWriterPluginName(dataTarget.getType()));
+        writer.put("name", DataxUtils.getWriterPluginName(DbType.of(dataxTaskExecutionContext.getTargetType())));
         writer.put("parameter", writerParam);
 
         List<JSONObject> contentList = new ArrayList<>();
@@ -348,7 +339,9 @@ public class DataxTask extends AbstractTask {
     private String buildShellCommandFile(String jobConfigFilePath)
         throws Exception {
         // generate scripts
-        String fileName = String.format("%s/%s_node.sh", taskDir, taskExecutionContext.getTaskAppId());
+        String fileName = String.format("%s/%s_node.sh",
+                taskExecutionContext.getExecutePath(),
+                taskExecutionContext.getTaskAppId());
         Path path = new File(fileName).toPath();
 
         if (Files.exists(path)) {
@@ -364,9 +357,6 @@ public class DataxTask extends AbstractTask {
         sbr.append(jobConfigFilePath);
         String dataxCommand = sbr.toString();
 
-        // find process instance by task id
-        ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskExecutionContext.getTaskInstanceId());
-
         // combining local and global parameters
         // replace placeholder
         Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index f9ef958..f264749 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -23,12 +23,10 @@ import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
index d5d2bb2..74a1728 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
@@ -30,13 +30,9 @@ import org.apache.dolphinscheduler.common.task.http.HttpParameters;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.http.HttpEntity;
 import org.apache.http.ParseException;
 import org.apache.http.client.config.RequestConfig;
@@ -67,10 +63,7 @@ public class HttpTask extends AbstractTask {
      */
     private HttpParameters httpParameters;
 
-    /**
-     *  process service
-     */
-    private ProcessService processService;
+
 
     /**
      * Convert mill seconds to second unit
@@ -146,7 +139,6 @@ public class HttpTask extends AbstractTask {
      */
     protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException {
         RequestBuilder builder = createRequestBuilder();
-        ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskExecutionContext.getTaskInstanceId());
 
         // replace placeholder
         Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
index 3923e7c..fbc7e21 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
@@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
 import org.apache.dolphinscheduler.server.worker.task.TaskProps;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
index 5c351d4..82cb6a2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
@@ -31,10 +31,9 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
 import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
 import org.apache.dolphinscheduler.dao.entity.DataSource;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index ada6b70..7a66227 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -24,14 +24,11 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.python.PythonParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 
 import java.util.Map;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index 68b9b04..9fa2abe 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -24,14 +24,11 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 
 import java.io.File;
@@ -55,11 +52,6 @@ public class ShellTask extends AbstractTask {
   private ShellParameters shellParameters;
 
   /**
-   * task dir
-   */
-  private String taskDir;
-
-  /**
    * shell command executor
    */
   private ShellCommandExecutor shellCommandExecutor;
@@ -122,7 +114,10 @@ public class ShellTask extends AbstractTask {
    */
   private String buildCommand() throws Exception {
     // generate scripts
-    String fileName = String.format("%s/%s_node.sh", taskDir, taskExecutionContext.getTaskAppId());
+    String fileName = String.format("%s/%s_node.sh",
+            taskExecutionContext.getExecutePath(),
+            taskExecutionContext.getTaskAppId());
+
     Path path = new File(fileName).toPath();
 
     if (Files.exists(path)) {
@@ -148,7 +143,7 @@ public class ShellTask extends AbstractTask {
     shellParameters.setRawScript(script);
 
     logger.info("raw script : {}", shellParameters.getRawScript());
-    logger.info("task dir : {}", taskDir);
+    logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
 
     Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
     FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
index 4bb91dd..e25cffb 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
@@ -24,11 +24,10 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index 8775830..9a45c7d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.serializer.SerializerFeature;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.dolphinscheduler.alert.utils.MailUtils;
 import org.apache.dolphinscheduler.common.Constants;
@@ -33,18 +32,15 @@ import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
 import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
-import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.utils.UDFUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.permission.PermissionCheck;
-import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 
 import java.sql.*;
@@ -64,22 +60,10 @@ public class SqlTask extends AbstractTask {
      *  sql parameters
      */
     private SqlParameters sqlParameters;
-
-    /**
-     *  process service
-     */
-    private ProcessService processService;
-
     /**
      *  alert dao
      */
     private AlertDao alertDao;
-
-    /**
-     * datasource
-     */
-    private DataSource dataSource;
-
     /**
      * base datasource
      */
@@ -102,7 +86,7 @@ public class SqlTask extends AbstractTask {
         if (!sqlParameters.checkParameters()) {
             throw new RuntimeException("sql task params is not valid");
         }
-        this.processService = SpringApplicationContext.getBean(ProcessService.class);
+
         this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
     }
 
@@ -111,6 +95,7 @@ public class SqlTask extends AbstractTask {
         // set the name of the current thread
         String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
         Thread.currentThread().setName(threadLoggerInfoName);
+
         logger.info("Full sql parameters: {}", sqlParameters);
         logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}",
                 sqlParameters.getType(),
@@ -121,37 +106,15 @@ public class SqlTask extends AbstractTask {
                 sqlParameters.getShowType(),
                 sqlParameters.getConnParams());
 
-        // not set data source
-        if (sqlParameters.getDatasource() == 0){
-            logger.error("datasource id not exists");
-            exitStatusCode = -1;
-            return;
-        }
-
-        dataSource= processService.findDataSourceById(sqlParameters.getDatasource());
-
-        // data source is null
-        if (dataSource == null){
-            logger.error("datasource not exists");
-            exitStatusCode = -1;
-            return;
-        }
-
-        logger.info("datasource name : {} , type : {} , desc : {}  , user_id : {} , parameter : {}",
-                dataSource.getName(),
-                dataSource.getType(),
-                dataSource.getNote(),
-                dataSource.getUserId(),
-                dataSource.getConnectionParams());
-
         Connection con = null;
         List<String> createFuncs = null;
         try {
             // load class
-            DataSourceFactory.loadClass(dataSource.getType());
+            DataSourceFactory.loadClass(DbType.valueOf(sqlParameters.getType()));
+
             // get datasource
-            baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
-                    dataSource.getConnectionParams());
+            baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf(sqlParameters.getType()),
+                    sqlParameters.getConnParams());
 
             // ready to execute SQL and parameter entity Map
             SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
@@ -175,9 +138,8 @@ public class SqlTask extends AbstractTask {
                 for(int i=0;i<ids.length;i++){
                     idsArray[i]=Integer.parseInt(ids[i]);
                 }
-
-                List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(idsArray);
-                createFuncs = UDFUtils.createFuncs(udfFuncList, taskExecutionContext.getTenantCode(), logger);
+                SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext();
+                createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(), taskExecutionContext.getTenantCode(), logger);
             }
 
             // execute sql task
@@ -262,7 +224,7 @@ public class SqlTask extends AbstractTask {
             CommonUtils.loadKerberosConf();
 
             // if hive , load connection params if exists
-            if (HIVE == dataSource.getType()) {
+            if (HIVE == DbType.valueOf(sqlParameters.getType())) {
                 Properties paramProp = new Properties();
                 paramProp.setProperty(USER, baseDataSource.getUser());
                 paramProp.setProperty(PASSWORD, baseDataSource.getPassword());
@@ -387,10 +349,7 @@ public class SqlTask extends AbstractTask {
      */
     public void sendAttachment(String title,String content){
 
-        //  process instance
-        ProcessInstance instance = processService.findProcessInstanceByTaskId(taskExecutionContext.getTaskInstanceId());
-
-        List<User> users = alertDao.queryUserByAlertGroupId(instance.getWarningGroupId());
+        List<User> users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId());
 
         // receiving group list
         List<String> receviersList = new ArrayList<String>();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index 05a0790..77d2139 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.DaoFactory;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -236,6 +236,8 @@ public class ZKMasterClient extends AbstractZKClient {
 
 	/**
 	 * monitor master
+	 * @param event event
+	 * @param path path
 	 */
 	public void handleMasterEvent(TreeCacheEvent event, String path){
 		switch (event.getType()) {
@@ -256,6 +258,8 @@ public class ZKMasterClient extends AbstractZKClient {
 
 	/**
 	 * monitor worker
+	 * @param event event
+	 * @param path path
 	 */
 	public void handleWorkerEvent(TreeCacheEvent event, String path){
 		switch (event.getType()) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
index 33990bc..a1d70f8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
@@ -72,6 +72,8 @@ public class ZKWorkerClient extends AbstractZKClient {
 
 	/**
 	 * monitor worker
+	 * @param event event
+	 * @param path path
 	 */
 	public void handleWorkerEvent(TreeCacheEvent event, String path){
 		switch (event.getType()) {
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index a26044e..55cd634 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1411,8 +1411,12 @@ public class ProcessService {
      */
     public void changeTaskState(ExecutionStatus state,
                                 Date endTime,
+                                int processId,
+                                String appIds,
                                 int taskInstId) {
         TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
+        taskInstance.setPid(processId);
+        taskInstance.setAppLink(appIds);
         taskInstance.setState(state);
         taskInstance.setEndTime(endTime);
         saveTaskInstance(taskInstance);