You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/03 14:18:57 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: host add host:port format (#2070)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 72e8f8d  host add host:port format (#2070)
72e8f8d is described below

commit 72e8f8d195ebd47869600017bc220f19088930fb
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Tue Mar 3 22:18:51 2020 +0800

    host add host:port format (#2070)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
    
    * remove chinese log
    
    * executeDirectly method add Override
    
    * remote module modify
    
    * TaskKillResponseProcessor command type modify
    
    * create buildKillCommand
    
    * host add host:port format
    
    * host add host:port format
---
 .../api/service/LoggerService.java                 |   5 +-
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  10 --
 .../builder/TaskExecutionContextBuilder.java       |   1 -
 .../server/entity/TaskExecutionContext.java        | 139 ++++++++++-----------
 .../server/master/dispatch/ExecutorDispatcher.java |   1 +
 .../server/master/runner/MasterTaskExecThread.java |   4 +-
 .../worker/processor/TaskExecuteProcessor.java     |   6 +-
 7 files changed, 77 insertions(+), 89 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
index 1f65208..91316af 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
@@ -68,7 +69,7 @@ public class LoggerService {
       return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg());
     }
 
-    String host = taskInstance.getHost();
+    String host = Host.of(taskInstance.getHost()).getIp();
     if(StringUtils.isEmpty(host)){
       return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg());
     }
@@ -94,7 +95,7 @@ public class LoggerService {
     if (taskInstance == null){
       throw new RuntimeException("task instance is null");
     }
-    String host = taskInstance.getHost();
+    String host = Host.of(taskInstance.getHost()).getIp();
     return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath());
   }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 92cb3af..e444ad2 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -186,21 +186,11 @@ public class TaskInstance implements Serializable {
     @TableField(exist = false)
     private String dependentResult;
 
-
     /**
      * worker group id
      */
     private int workerGroupId;
 
-
-
-    public void init(String host,Date startTime,String executePath){
-        this.host = host;
-        this.startTime = startTime;
-        this.executePath = executePath;
-    }
-
-
     public ProcessInstance getProcessInstance() {
         return processInstance;
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 1388e79..34d96aa 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -48,7 +48,6 @@ public class TaskExecutionContextBuilder {
         taskExecutionContext.setLogPath(taskInstance.getLogPath());
         taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
         taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
-        taskExecutionContext.setHost(taskInstance.getHost());
         taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
         return this;
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index fb3aab9..2348b47 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -29,7 +29,7 @@ public class TaskExecutionContext implements Serializable{
     /**
      *  task id
      */
-    private Integer taskInstanceId;
+    private int taskInstanceId;
 
 
     /**
@@ -51,7 +51,7 @@ public class TaskExecutionContext implements Serializable{
      * host
      */
     private String host;
-
+    
     /**
      *  task execute path
      */
@@ -70,7 +70,7 @@ public class TaskExecutionContext implements Serializable{
     /**
      * processId
      */
-    private Integer processId;
+    private int processId;
 
     /**
      * appIds
@@ -80,7 +80,7 @@ public class TaskExecutionContext implements Serializable{
     /**
      *  process instance id
      */
-    private Integer processInstanceId;
+    private int processInstanceId;
 
 
     /**
@@ -97,13 +97,13 @@ public class TaskExecutionContext implements Serializable{
     /**
      *  execute user id
      */
-    private Integer executorId;
+    private int executorId;
 
 
     /**
      *  command type if complement
      */
-    private Integer cmdTypeIfComplement;
+    private int cmdTypeIfComplement;
 
 
     /**
@@ -120,12 +120,12 @@ public class TaskExecutionContext implements Serializable{
     /**
      *  process define id
      */
-    private Integer processDefineId;
+    private int processDefineId;
 
     /**
      *  project id
      */
-    private Integer projectId;
+    private int projectId;
 
     /**
      * taskParams
@@ -173,22 +173,11 @@ public class TaskExecutionContext implements Serializable{
      */
     private DataxTaskExecutionContext dataxTaskExecutionContext;
 
-
-
-
-    public String getWorkerGroup() {
-        return workerGroup;
-    }
-
-    public void setWorkerGroup(String workerGroup) {
-        this.workerGroup = workerGroup;
-    }
-
-    public Integer getTaskInstanceId() {
+    public int getTaskInstanceId() {
         return taskInstanceId;
     }
 
-    public void setTaskInstanceId(Integer taskInstanceId) {
+    public void setTaskInstanceId(int taskInstanceId) {
         this.taskInstanceId = taskInstanceId;
     }
 
@@ -216,6 +205,14 @@ public class TaskExecutionContext implements Serializable{
         this.taskType = taskType;
     }
 
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
     public String getExecutePath() {
         return executePath;
     }
@@ -224,6 +221,14 @@ public class TaskExecutionContext implements Serializable{
         this.executePath = executePath;
     }
 
+    public String getLogPath() {
+        return logPath;
+    }
+
+    public void setLogPath(String logPath) {
+        this.logPath = logPath;
+    }
+
     public String getTaskJson() {
         return taskJson;
     }
@@ -232,11 +237,27 @@ public class TaskExecutionContext implements Serializable{
         this.taskJson = taskJson;
     }
 
-    public Integer getProcessInstanceId() {
+    public int getProcessId() {
+        return processId;
+    }
+
+    public void setProcessId(int processId) {
+        this.processId = processId;
+    }
+
+    public String getAppIds() {
+        return appIds;
+    }
+
+    public void setAppIds(String appIds) {
+        this.appIds = appIds;
+    }
+
+    public int getProcessInstanceId() {
         return processInstanceId;
     }
 
-    public void setProcessInstanceId(Integer processInstanceId) {
+    public void setProcessInstanceId(int processInstanceId) {
         this.processInstanceId = processInstanceId;
     }
 
@@ -256,6 +277,22 @@ public class TaskExecutionContext implements Serializable{
         this.globalParams = globalParams;
     }
 
+    public int getExecutorId() {
+        return executorId;
+    }
+
+    public void setExecutorId(int executorId) {
+        this.executorId = executorId;
+    }
+
+    public int getCmdTypeIfComplement() {
+        return cmdTypeIfComplement;
+    }
+
+    public void setCmdTypeIfComplement(int cmdTypeIfComplement) {
+        this.cmdTypeIfComplement = cmdTypeIfComplement;
+    }
+
     public String getTenantCode() {
         return tenantCode;
     }
@@ -272,46 +309,22 @@ public class TaskExecutionContext implements Serializable{
         this.queue = queue;
     }
 
-    public Integer getProcessDefineId() {
+    public int getProcessDefineId() {
         return processDefineId;
     }
 
-    public void setProcessDefineId(Integer processDefineId) {
+    public void setProcessDefineId(int processDefineId) {
         this.processDefineId = processDefineId;
     }
 
-    public Integer getProjectId() {
+    public int getProjectId() {
         return projectId;
     }
 
-    public void setProjectId(Integer projectId) {
+    public void setProjectId(int projectId) {
         this.projectId = projectId;
     }
 
-    public Integer getExecutorId() {
-        return executorId;
-    }
-
-    public void setExecutorId(Integer executorId) {
-        this.executorId = executorId;
-    }
-
-    public Integer getCmdTypeIfComplement() {
-        return cmdTypeIfComplement;
-    }
-
-    public void setCmdTypeIfComplement(Integer cmdTypeIfComplement) {
-        this.cmdTypeIfComplement = cmdTypeIfComplement;
-    }
-
-    public String getLogPath() {
-        return logPath;
-    }
-
-    public void setLogPath(String logPath) {
-        this.logPath = logPath;
-    }
-
     public String getTaskParams() {
         return taskParams;
     }
@@ -360,28 +373,12 @@ public class TaskExecutionContext implements Serializable{
         this.taskTimeout = taskTimeout;
     }
 
-    public String getHost() {
-        return host;
-    }
-
-    public void setHost(String host) {
-        this.host = host;
-    }
-
-    public Integer getProcessId() {
-        return processId;
-    }
-
-    public void setProcessId(Integer processId) {
-        this.processId = processId;
-    }
-
-    public String getAppIds() {
-        return appIds;
+    public String getWorkerGroup() {
+        return workerGroup;
     }
 
-    public void setAppIds(String appIds) {
-        this.appIds = appIds;
+    public void setWorkerGroup(String workerGroup) {
+        this.workerGroup = workerGroup;
     }
 
     public SQLTaskExecutionContext getSqlTaskExecutionContext() {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index df563a6..97b489e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -90,6 +90,7 @@ public class ExecutorDispatcher implements InitializingBean {
             throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext()));
         }
         context.setHost(host);
+        context.getContext().setHost(host.getAddress());
         executorManager.beforeExecute(context);
         try {
             /**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 51a4f44..a196832 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -186,9 +186,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
         TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance);
         ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER);
 
-        Host host = new Host();
-        host.setIp(taskInstance.getHost());
-        host.setPort(12346);
+        Host host = Host.of(taskInstance.getHost());
         executionContext.setHost(host);
 
         nettyExecutorManager.executeDirectly(executionContext);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index b53ef17..5042c97 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -78,10 +78,12 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
                 String.format("invalid command type : %s", command.getType()));
-        logger.info("received command : {}", command);
+
         TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
                 command.getBody(), TaskExecuteRequestCommand.class);
 
+        logger.info("received command : {}", taskRequestCommand);
+
         String contextJson = taskRequestCommand.getTaskExecutionContext();
 
         TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
@@ -141,7 +143,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
         ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
         ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
-        ackCommand.setHost(OSUtils.getHost());
+        ackCommand.setHost(taskExecutionContext.getHost());
         ackCommand.setStartTime(new Date());
         if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
             ackCommand.setExecutePath(null);