You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/02/21 12:18:12 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: 1, master persistent task 2. extract master and worker communication model (#1992)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new ff86dc7  1, master persistent task 2. extract  master and worker communication model (#1992)
ff86dc7 is described below

commit ff86dc7d57e42d62c8237a1bb44b16d473d23475
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Fri Feb 21 20:18:02 2020 +0800

    1, master persistent task 2. extract  master and worker communication model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
---
 .../dolphinscheduler/common/enums/CommandType.java |   9 +
 .../dolphinscheduler/dao/entity/TaskInstance.java  |   5 +-
 .../dao/mapper/DataSourceMapper.java               |   2 +
 .../remote/command/ExecuteTaskRequestCommand.java  |   2 +-
 .../dolphinscheduler/remote/command/TaskInfo.java  | 250 +++++++++++++++++++++
 .../server/master/MasterServer.java                |   1 +
 .../master/processor/TaskResponseProcessor.java    |   7 +
 .../master/runner/MasterBaseTaskExecThread.java    | 104 ++++++++-
 .../worker/processor/WorkerRequestProcessor.java   |  64 ++----
 .../server/worker/runner/TaskScheduleThread.java   | 120 ++++------
 10 files changed, 436 insertions(+), 128 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
index 1ee7915..56fdd07 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
@@ -65,4 +65,13 @@ public enum CommandType {
     public String getDescp() {
         return descp;
     }
+
+    public static CommandType of(Integer status){
+        for(CommandType cmdType : values()){
+            if(cmdType.getCode() == status){
+                return cmdType;
+            }
+        }
+        throw new IllegalArgumentException("invalid status : " + status);
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 2db1eda..c9481ba 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -27,13 +27,14 @@ import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 
+import java.io.Serializable;
 import java.util.Date;
 
 /**
  * task instance
  */
 @TableName("t_ds_task_instance")
-public class TaskInstance {
+public class TaskInstance implements Serializable {
 
     /**
      * id
@@ -198,7 +199,7 @@ public class TaskInstance {
 
 
 
-    public void  init(String host,Date startTime,String executePath){
+    public void init(String host,Date startTime,String executePath){
         this.host = host;
         this.startTime = startTime;
         this.executePath = executePath;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
index f95fbc7..0c3238a 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
@@ -79,8 +79,10 @@ public interface DataSourceMapper extends BaseMapper<DataSource> {
 
     /**
      * list authorized UDF function
+     *
      * @param userId userId
      * @param dataSourceIds data source id array
+     * @param <T> T
      * @return UDF function list
      */
     <T> List<DataSource> listAuthorizedDataSource(@Param("userId") int userId,@Param("dataSourceIds")T[] dataSourceIds);
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
index 4d01142..e1556f3 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
@@ -1 +1 @@
-/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  execute task request command
 */
pub
 lic class ExecuteTaskRequestCommand implements Serializable {

    /**
     *  task instance json
     */
    private String taskInstanceJson;

    public String getTaskInstanceJson() {
        return taskInstanceJson;
    }

    public void setTaskInstanceJson(String taskInstanceJson) {
        this.taskInstanceJson = taskInstanceJson;
    }

    public ExecuteTaskRequestCommand() {
    }

    public ExecuteTaskRequestCommand(String taskInstanceJson) {
        this.taskInstanceJson = taskInstanceJson;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "ExecuteTaskRequestCommand{" +
                "taskInstanceJson='" + taskInstanceJson
  + '\'' +
                '}';
    }
}
\ No newline at end of file
+/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.remote.command;

import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;

import java.io.Serializable;

/**
 *  execute task request command
 */
pub
 lic class ExecuteTaskRequestCommand implements Serializable {

    /**
     *  task instance json
     */
    private String taskInfoJson;

    public String getTaskInfoJson() {
        return taskInfoJson;
    }

    public void setTaskInfoJson(String taskInfoJson) {
        this.taskInfoJson = taskInfoJson;
    }

    public ExecuteTaskRequestCommand() {
    }

    public ExecuteTaskRequestCommand(String taskInfoJson) {
        this.taskInfoJson = taskInfoJson;
    }

    /**
     *  package request command
     *
     * @return command
     */
    public Command convert2Command(){
        Command command = new Command();
        command.setType(CommandType.EXECUTE_TASK_REQUEST);
        byte[] body = FastJsonSerializer.serialize(this);
        command.setBody(body);
        return command;
    }

    @Override
    public String toString() {
        return "ExecuteTaskRequestCommand{" +
                "taskInfoJson='" + taskInfoJson + '\'' +
                '}';
    }
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java
new file mode 100644
index 0000000..3fb58fe
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ *  master/worker task transport
+ */
+public class TaskInfo implements Serializable{
+
+    /**
+     *  task instance id
+     */
+    private Integer taskId;
+
+
+    /**
+     *  taks name
+     */
+    private String taskName;
+
+    /**
+     *  task start time
+     */
+    private Date startTime;
+
+    /**
+     *  task type
+     */
+    private String taskType;
+
+    /**
+     *  task execute path
+     */
+    private String executePath;
+
+    /**
+     *  task json
+     */
+    private String taskJson;
+
+
+    /**
+     *  process instance id
+     */
+    private Integer processInstanceId;
+
+
+    /**
+     *  process instance schedule time
+     */
+    private Date scheduleTime;
+
+    /**
+     *  process instance global parameters
+     */
+    private String globalParams;
+
+
+    /**
+     *  execute user id
+     */
+    private Integer executorId;
+
+
+    /**
+     *  command type if complement
+     */
+    private Integer cmdTypeIfComplement;
+
+
+    /**
+     *  tenant code
+     */
+    private String tenantCode;
+
+    /**
+     *  task queue
+     */
+    private String queue;
+
+
+    /**
+     *  process define id
+     */
+    private Integer processDefineId;
+
+    /**
+     *  project id
+     */
+    private Integer projectId;
+
+    public Integer getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(Integer taskId) {
+        this.taskId = taskId;
+    }
+
+    public String getTaskName() {
+        return taskName;
+    }
+
+    public void setTaskName(String taskName) {
+        this.taskName = taskName;
+    }
+
+    public Date getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(Date startTime) {
+        this.startTime = startTime;
+    }
+
+    public String getTaskType() {
+        return taskType;
+    }
+
+    public void setTaskType(String taskType) {
+        this.taskType = taskType;
+    }
+
+    public String getExecutePath() {
+        return executePath;
+    }
+
+    public void setExecutePath(String executePath) {
+        this.executePath = executePath;
+    }
+
+    public String getTaskJson() {
+        return taskJson;
+    }
+
+    public void setTaskJson(String taskJson) {
+        this.taskJson = taskJson;
+    }
+
+    public Integer getProcessInstanceId() {
+        return processInstanceId;
+    }
+
+    public void setProcessInstanceId(Integer processInstanceId) {
+        this.processInstanceId = processInstanceId;
+    }
+
+    public Date getScheduleTime() {
+        return scheduleTime;
+    }
+
+    public void setScheduleTime(Date scheduleTime) {
+        this.scheduleTime = scheduleTime;
+    }
+
+    public String getGlobalParams() {
+        return globalParams;
+    }
+
+    public void setGlobalParams(String globalParams) {
+        this.globalParams = globalParams;
+    }
+
+    public String getTenantCode() {
+        return tenantCode;
+    }
+
+    public void setTenantCode(String tenantCode) {
+        this.tenantCode = tenantCode;
+    }
+
+    public String getQueue() {
+        return queue;
+    }
+
+    public void setQueue(String queue) {
+        this.queue = queue;
+    }
+
+    public Integer getProcessDefineId() {
+        return processDefineId;
+    }
+
+    public void setProcessDefineId(Integer processDefineId) {
+        this.processDefineId = processDefineId;
+    }
+
+    public Integer getProjectId() {
+        return projectId;
+    }
+
+    public void setProjectId(Integer projectId) {
+        this.projectId = projectId;
+    }
+
+    public Integer getExecutorId() {
+        return executorId;
+    }
+
+    public void setExecutorId(Integer executorId) {
+        this.executorId = executorId;
+    }
+
+    public Integer getCmdTypeIfComplement() {
+        return cmdTypeIfComplement;
+    }
+
+    public void setCmdTypeIfComplement(Integer cmdTypeIfComplement) {
+        this.cmdTypeIfComplement = cmdTypeIfComplement;
+    }
+
+    @Override
+    public String toString() {
+        return "TaskInfo{" +
+                "taskId=" + taskId +
+                ", taskName='" + taskName + '\'' +
+                ", startTime=" + startTime +
+                ", taskType='" + taskType + '\'' +
+                ", executePath='" + executePath + '\'' +
+                ", taskJson='" + taskJson + '\'' +
+                ", processInstanceId=" + processInstanceId +
+                ", scheduleTime=" + scheduleTime +
+                ", globalParams='" + globalParams + '\'' +
+                ", executorId=" + executorId +
+                ", cmdTypeIfComplement=" + cmdTypeIfComplement +
+                ", tenantCode='" + tenantCode + '\'' +
+                ", queue='" + queue + '\'' +
+                ", processDefineId=" + processDefineId +
+                ", projectId=" + projectId +
+                '}';
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 0a153e7..d0c7bc2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -118,6 +118,7 @@ public class MasterServer implements IStoppable {
         //
         //init remoting server
         NettyServerConfig serverConfig = new NettyServerConfig();
+        serverConfig.setListenPort(45678);
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
         this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor(processService));
         this.nettyRemotingServer.start();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 0dd45f0..c3b6a05 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -46,6 +46,13 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
         this.processService = processService;
     }
 
+    /**
+     * task final result response
+     * need master process , state persistence
+     *
+     * @param channel channel
+     * @param command command
+     */
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index a261b34..4ae057a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -17,14 +17,18 @@
 package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.utils.BeanContext;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskInfo;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
 import org.apache.dolphinscheduler.remote.utils.Address;
@@ -124,11 +128,23 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
     // TODO send task to worker
     public void sendToWorker(TaskInstance taskInstance){
         final Address address = new Address("127.0.0.1", 12346);
-        ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(FastJsonSerializer.serializeToString(taskInstance));
+
+        /**
+         *  set taskInstance relation
+         */
+        TaskInstance destTaskInstance = setTaskInstanceRelation(taskInstance);
+
+        ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(
+                FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance)));
         try {
-            Command responseCommand = nettyRemotingClient.sendSync(address, taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
-            ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(responseCommand.getBody(), ExecuteTaskAckCommand.class);
+            Command responseCommand = nettyRemotingClient.sendSync(address,
+                    taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
+
+            ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
+                    responseCommand.getBody(), ExecuteTaskAckCommand.class);
+
             logger.info("taskAckCommand : {}",taskAckCommand);
+
             processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
                     taskAckCommand.getStartTime(),
                     taskAckCommand.getHost(),
@@ -141,6 +157,88 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
         }
     }
 
+
+    /**
+     *  set task instance relation
+     *
+     * @param taskInstance taskInstance
+     */
+    private TaskInstance setTaskInstanceRelation(TaskInstance taskInstance){
+        taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
+
+        int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
+        Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
+        // verify tenant is null
+        if (verifyTenantIsNull(tenant, taskInstance)) {
+            processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId());
+            return null;
+        }
+        // set queue for process instance, user-specified queue takes precedence over tenant queue
+        String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
+        taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
+        taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
+
+        return taskInstance;
+    }
+
+
+    /**
+     *  whehter tenant is null
+     * @param tenant tenant
+     * @param taskInstance taskInstance
+     * @return result
+     */
+    private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
+        if(tenant == null){
+            logger.error("tenant not exists,process instance id : {},task instance id : {}",
+                    taskInstance.getProcessInstance().getId(),
+                    taskInstance.getId());
+            return true;
+        }
+        return false;
+    }
+
+
+    /**
+     * taskInstance convert to taskInfo
+     *
+     * @param taskInstance taskInstance
+     * @return taskInfo
+     */
+    private TaskInfo convertToTaskInfo(TaskInstance taskInstance){
+        TaskInfo taskInfo = new TaskInfo();
+        taskInfo.setTaskId(taskInstance.getId());
+        taskInfo.setTaskName(taskInstance.getName());
+        taskInfo.setStartTime(taskInstance.getStartTime());
+        taskInfo.setTaskType(taskInstance.getTaskType());
+        taskInfo.setExecutePath(getExecLocalPath(taskInstance));
+        taskInfo.setTaskJson(taskInstance.getTaskJson());
+        taskInfo.setProcessInstanceId(taskInstance.getProcessInstance().getId());
+        taskInfo.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime());
+        taskInfo.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams());
+        taskInfo.setExecutorId(taskInstance.getProcessInstance().getExecutorId());
+        taskInfo.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode());
+        taskInfo.setTenantCode(taskInstance.getProcessInstance().getTenantCode());
+        taskInfo.setQueue(taskInstance.getProcessInstance().getQueue());
+        taskInfo.setProcessDefineId(taskInstance.getProcessDefine().getId());
+        taskInfo.setProjectId(taskInstance.getProcessDefine().getProjectId());
+
+        return taskInfo;
+    }
+
+
+    /**
+     * get execute local path
+     *
+     * @return execute local path
+     */
+    private String getExecLocalPath(TaskInstance taskInstance){
+        return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
+                taskInstance.getProcessDefine().getId(),
+                taskInstance.getProcessInstance().getId(),
+                taskInstance.getId());
+    }
+
     /**
      * submit master base task exec thread
      * @return TaskInstance
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
index 8ea9ccb..ba21494 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
@@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskInfo;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -85,71 +85,39 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
         ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
                 command.getBody(), ExecuteTaskRequestCommand.class);
 
-        String taskInstanceJson = taskRequestCommand.getTaskInstanceJson();
+        String taskInstanceJson = taskRequestCommand.getTaskInfoJson();
 
-        TaskInstance taskInstance = JSONObject.parseObject(taskInstanceJson, TaskInstance.class);
-
-        taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
-
-
-        //TODO this logic need add to master
-        int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
-        Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
-        // verify tenant is null
-        if (verifyTenantIsNull(tenant, taskInstance)) {
-            processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId());
-            return;
-        }
-        // set queue for process instance, user-specified queue takes precedence over tenant queue
-        String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
-        taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
-        taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
-        //TODO end
+        TaskInfo taskInfo = JSONObject.parseObject(taskInstanceJson, TaskInfo.class);
 
         // local execute path
-        String execLocalPath = getExecLocalPath(taskInstance);
+        String execLocalPath = getExecLocalPath(taskInfo);
         logger.info("task instance  local execute path : {} ", execLocalPath);
-        // init task
-        taskInstance.init(OSUtils.getHost(), new Date(), execLocalPath);
+
         try {
-            FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, tenant.getTenantCode());
+            FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskInfo.getTenantCode());
         } catch (Exception ex){
             logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
         }
 
-        taskCallbackService.addCallbackChannel(taskInstance.getId(),
+        taskCallbackService.addCallbackChannel(taskInfo.getTaskId(),
                 new CallbackChannel(channel, command.getOpaque()));
 
         // submit task
-        workerExecService.submit(new TaskScheduleThread(taskInstance,
+        workerExecService.submit(new TaskScheduleThread(taskInfo,
                 processService, taskCallbackService));
     }
 
-    /**
-     *  whehter tenant is null
-     * @param tenant tenant
-     * @param taskInstance taskInstance
-     * @return result
-     */
-    private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
-        if(tenant == null){
-            logger.error("tenant not exists,process instance id : {},task instance id : {}",
-                    taskInstance.getProcessInstance().getId(),
-                    taskInstance.getId());
-            return true;
-        }
-        return false;
-    }
 
     /**
-     *  get execute local path
-     * @param taskInstance taskInstance
+     * get execute local path
+     *
+     * @param taskInfo taskInfo
      * @return execute local path
      */
-    private String getExecLocalPath(TaskInstance taskInstance){
-        return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
-                taskInstance.getProcessDefine().getId(),
-                taskInstance.getProcessInstance().getId(),
-                taskInstance.getId());
+    private String getExecLocalPath(TaskInfo taskInfo){
+        return FileUtils.getProcessExecDir(taskInfo.getProjectId(),
+                taskInfo.getProcessDefineId(),
+                taskInfo.getProcessInstanceId(),
+                taskInfo.getTaskId());
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
index 349e762..0de8ea3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
@@ -22,6 +22,7 @@ import ch.qos.logback.classic.sift.SiftingAppender;
 import com.alibaba.fastjson.JSONObject;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
+import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
@@ -29,14 +30,12 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
-import org.apache.dolphinscheduler.common.utils.HadoopUtils;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
+import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.TaskInfo;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.TaskManager;
@@ -64,7 +63,7 @@ public class TaskScheduleThread implements Runnable {
     /**
      *  task instance
      */
-    private TaskInstance taskInstance;
+    private TaskInfo taskInfo;
 
     /**
      *  process service
@@ -82,72 +81,69 @@ public class TaskScheduleThread implements Runnable {
     private TaskCallbackService taskInstanceCallbackService;
 
     /**
-     * constructor
+     *  constructor
      *
-     * @param taskInstance  task instance
-     * @param processService    process dao
+     * @param taskInfo taskInfo
+     * @param processService processService
+     * @param taskInstanceCallbackService taskInstanceCallbackService
      */
-    public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskCallbackService taskInstanceCallbackService){
+    public TaskScheduleThread(TaskInfo taskInfo, ProcessService processService, TaskCallbackService taskInstanceCallbackService){
         this.processService = processService;
-        this.taskInstance = taskInstance;
+        this.taskInfo = taskInfo;
         this.taskInstanceCallbackService = taskInstanceCallbackService;
     }
 
     @Override
     public void run() {
 
-        ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId());
+        ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInfo.getTaskId());
 
         try {
             // tell master that task is in executing
-            ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType());
-            taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand);
+            ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInfo.getTaskType());
+            taskInstanceCallbackService.sendAck(taskInfo.getTaskId(), ackCommand);
 
-            logger.info("script path : {}", taskInstance.getExecutePath());
+            logger.info("script path : {}", taskInfo.getExecutePath());
             // task node
-            TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
+            TaskNode taskNode = JSONObject.parseObject(taskInfo.getTaskJson(), TaskNode.class);
 
             // get resource files
             List<String> resourceFiles = createProjectResFiles(taskNode);
             // copy hdfs/minio file to local
             downloadResource(
-                    taskInstance.getExecutePath(),
+                    taskInfo.getExecutePath(),
                     resourceFiles,
                     logger);
 
-
-            // get process instance according to tak instance
-            ProcessInstance processInstance = taskInstance.getProcessInstance();
-
             // set task props
             TaskProps taskProps = new TaskProps(taskNode.getParams(),
-                    taskInstance.getExecutePath(),
-                    processInstance.getScheduleTime(),
-                    taskInstance.getName(),
-                    taskInstance.getTaskType(),
-                    taskInstance.getId(),
+                    taskInfo.getExecutePath(),
+                    taskInfo.getScheduleTime(),
+                    taskInfo.getTaskName(),
+                    taskInfo.getTaskType(),
+                    taskInfo.getTaskId(),
                     CommonUtils.getSystemEnvPath(),
-                    processInstance.getTenantCode(),
-                    processInstance.getQueue(),
-                    taskInstance.getStartTime(),
+                    taskInfo.getTenantCode(),
+                    taskInfo.getQueue(),
+                    taskInfo.getStartTime(),
                     getGlobalParamsMap(),
-                    taskInstance.getDependency(),
-                    processInstance.getCmdTypeIfComplement());
+                    null,
+                    CommandType.of(taskInfo.getCmdTypeIfComplement()));
             // set task timeout
             setTaskTimeout(taskProps, taskNode);
 
             taskProps.setTaskAppId(String.format("%s_%s_%s",
-                    taskInstance.getProcessDefine().getId(),
-                    taskInstance.getProcessInstance().getId(),
-                    taskInstance.getId()));
+                    taskInfo.getProcessDefineId(),
+                    taskInfo.getProcessInstanceId(),
+                    taskInfo.getTaskId()));
 
             // custom logger
             Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
-                    taskInstance.getProcessDefine().getId(),
-                    taskInstance.getProcessInstance().getId(),
-                    taskInstance.getId()));
+                    taskInfo.getProcessDefineId(),
+                    taskInfo.getProcessInstanceId(),
+                    taskInfo.getTaskId()));
 
-            task = TaskManager.newTask(taskInstance.getTaskType(),
+            task = TaskManager.newTask(taskInfo.getTaskType(),
                     taskProps,
                     taskLogger);
 
@@ -163,14 +159,14 @@ public class TaskScheduleThread implements Runnable {
             //
             responseCommand.setStatus(task.getExitStatus().getCode());
             responseCommand.setEndTime(new Date());
-            logger.info("task instance id : {},task final status : {}", taskInstance.getId(), task.getExitStatus());
+            logger.info("task instance id : {},task final status : {}", taskInfo.getTaskId(), task.getExitStatus());
         }catch (Exception e){
             logger.error("task scheduler failure", e);
             kill();
             responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
             responseCommand.setEndTime(new Date());
         } finally {
-            taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand);
+            taskInstanceCallbackService.sendResult(taskInfo.getTaskId(), responseCommand);
         }
     }
 
@@ -182,7 +178,7 @@ public class TaskScheduleThread implements Runnable {
         Map<String,String> globalParamsMap = new HashMap<>(16);
 
         // global params string
-        String globalParamsStr = taskInstance.getProcessInstance().getGlobalParams();
+        String globalParamsStr = taskInfo.getGlobalParams();
 
         if (globalParamsStr != null) {
             List<Property> globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class);
@@ -192,18 +188,18 @@ public class TaskScheduleThread implements Runnable {
     }
     /**
      *  build ack command
-     * @param taskType
+     * @param taskType taskType
      */
     private ExecuteTaskAckCommand buildAckCommand(String taskType) {
         ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
         ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
         ackCommand.setLogPath(getTaskLogPath());
-        ackCommand.setHost("localhost");
+        ackCommand.setHost(OSUtils.getHost());
         ackCommand.setStartTime(new Date());
         if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
             ackCommand.setExecutePath(null);
         }else{
-            ackCommand.setExecutePath(taskInstance.getExecutePath());
+            ackCommand.setExecutePath(taskInfo.getExecutePath());
         }
         return ackCommand;
     }
@@ -219,15 +215,15 @@ public class TaskScheduleThread implements Runnable {
                 .getDiscriminator()).getLogBase();
         if (baseLog.startsWith(Constants.SINGLE_SLASH)){
             return baseLog + Constants.SINGLE_SLASH +
-                    taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH  +
-                    taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH  +
-                    taskInstance.getId() + ".log";
+                    taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH  +
+                    taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH  +
+                    taskInfo.getTaskId() + ".log";
         }
         return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
                 baseLog +  Constants.SINGLE_SLASH +
-                taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH  +
-                taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH  +
-                taskInstance.getId() + ".log";
+                taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH  +
+                taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH  +
+                taskInfo.getTaskId() + ".log";
     }
 
     /**
@@ -333,33 +329,9 @@ public class TaskScheduleThread implements Runnable {
      * @throws Exception exception
      */
     private void checkDownloadPermission(List<String> projectRes) throws Exception {
-        int userId = taskInstance.getProcessInstance().getExecutorId();
+        int userId = taskInfo.getExecutorId();
         String[] resNames = projectRes.toArray(new String[projectRes.size()]);
         PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
         permissionCheck.checkPermission();
     }
-
-    /**
-     *  update task state according to task type
-     * @param taskType
-     */
-    private void updateTaskState(String taskType) {
-        // update task status is running
-        if(taskType.equals(TaskType.SQL.name())  ||
-                taskType.equals(TaskType.PROCEDURE.name())){
-            processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
-                    taskInstance.getStartTime(),
-                    taskInstance.getHost(),
-                    null,
-                    getTaskLogPath(),
-                    taskInstance.getId());
-        }else{
-            processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
-                    taskInstance.getStartTime(),
-                    taskInstance.getHost(),
-                    taskInstance.getExecutePath(),
-                    getTaskLogPath(),
-                    taskInstance.getId());
-        }
-    }
 }
\ No newline at end of file