You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/04 14:22:04 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: Refactor worker (#2081)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 15bf740  Refactor worker (#2081)
15bf740 is described below

commit 15bf740221a3a1731360dfbae0618f7623ba471d
Author: Tboy <gu...@immomo.com>
AuthorDate: Wed Mar 4 22:21:54 2020 +0800

    Refactor worker (#2081)
    
    * refactor kill logic
    
    * refactor ExecutionContext
---
 .../server/entity/TaskExecutionContext.java        | 17 +++++++
 .../server/master/dispatch/ExecutorDispatcher.java |  7 +--
 .../master/dispatch/context/ExecutionContext.java  | 35 +++++++++----
 .../dispatch/executor/NettyExecutorManager.java    | 57 ++--------------------
 .../master/runner/MasterBaseTaskExecThread.java    |  9 ++--
 .../server/master/runner/MasterTaskExecThread.java |  3 +-
 6 files changed, 53 insertions(+), 75 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 2348b47..03ce0ee 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -17,6 +17,11 @@
 
 package org.apache.dolphinscheduler.server.entity;
 
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
 import java.io.Serializable;
 import java.util.Date;
 import java.util.Map;
@@ -397,6 +402,18 @@ public class TaskExecutionContext implements Serializable{
         this.dataxTaskExecutionContext = dataxTaskExecutionContext;
     }
 
+    public Command toCommand(){
+        TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
+        requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this));
+        return requestCommand.convert2Command();
+    }
+
+    public Command toKillCommand(){
+        TaskKillRequestCommand requestCommand = new TaskKillRequestCommand();
+        requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this));
+        return requestCommand.convert2Command();
+    }
+
     @Override
     public String toString() {
         return "TaskExecutionContext{" +
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 97b489e..38914e5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.dispatch;
 
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
@@ -45,9 +44,6 @@ public class ExecutorDispatcher implements InitializingBean {
     @Autowired
     private NettyExecutorManager nettyExecutorManager;
 
-    @Autowired
-    private MasterConfig masterConfig;
-
     /**
      * round robin host manager
      */
@@ -87,10 +83,9 @@ public class ExecutorDispatcher implements InitializingBean {
          */
         Host host = hostManager.select(context);
         if (StringUtils.isEmpty(host.getAddress())) {
-            throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext()));
+            throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getCommand()));
         }
         context.setHost(host);
-        context.getContext().setHost(host.getAddress());
         executorManager.beforeExecute(context);
         try {
             /**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index 19124d3..9d04511 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -17,8 +17,8 @@
 package org.apache.dolphinscheduler.server.master.dispatch.context;
 
 
+import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 
 /**
@@ -32,30 +32,47 @@ public class ExecutionContext {
     private Host host;
 
     /**
-     *  context
+     *  command
      */
-    private final TaskExecutionContext context;
+    private final Command command;
 
     /**
      *  executor type : worker or client
      */
     private final ExecutorType executorType;
 
-    public ExecutionContext(TaskExecutionContext context, ExecutorType executorType) {
-        this.context = context;
+    /**
+     *  worker group
+     */
+    private String workerGroup;
+
+
+    public ExecutionContext(Command command, ExecutorType executorType) {
+        this.command = command;
         this.executorType = executorType;
     }
 
-    public String getWorkerGroup(){
-        return context.getWorkerGroup();
+    public ExecutionContext(Command command, ExecutorType executorType, String workerGroup) {
+        this.command = command;
+        this.executorType = executorType;
+        this.workerGroup = workerGroup;
+    }
+
+    public Command getCommand() {
+        return command;
     }
 
     public ExecutorType getExecutorType() {
         return executorType;
     }
 
-    public TaskExecutionContext getContext() {
-        return context;
+    public void setWorkerGroup(String workerGroup) {
+        this.workerGroup = workerGroup;
+    }
+
+
+    public String getWorkerGroup(){
+        return this.workerGroup;
     }
 
     public Host getHost() {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 7719cf8..e3d45f0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -21,12 +21,8 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
-import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
@@ -98,7 +94,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
         /**
          *  build command accord executeContext
          */
-        Command command = buildCommand(context);
+        Command command = context.getCommand();
 
         /**
          * execute task host
@@ -111,14 +107,14 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
                 success = true;
                 context.setHost(host);
             } catch (ExecuteException ex) {
-                logger.error(String.format("execute context : %s error", context.getContext()), ex);
+                logger.error(String.format("execute command : %s error", command), ex);
                 try {
                     failNodeSet.add(host.getAddress());
                     Set<String> tmpAllIps = new HashSet<>(allNodes);
                     Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
                     if (remained != null && remained.size() > 0) {
                         host = Host.of(remained.iterator().next());
-                        logger.error("retry execute context : {} host : {}", context.getContext(), host);
+                        logger.error("retry execute command : {} host : {}", command, host);
                     } else {
                         throw new ExecuteException("fail after try all nodes");
                     }
@@ -133,53 +129,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
 
     @Override
     public void executeDirectly(ExecutionContext context) throws ExecuteException {
-        Command command = buildKillCommand(context);
         Host host = context.getHost();
-        doExecute(host,command);
-    }
-
-    /**
-     *  build command
-     * @param context context
-     * @return command
-     */
-    private Command buildCommand(ExecutionContext context) {
-        TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
-        ExecutorType executorType = context.getExecutorType();
-        switch (executorType){
-            case WORKER:
-                TaskExecutionContext taskExecutionContext = context.getContext();
-                requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
-                break;
-            case CLIENT:
-                break;
-            default:
-                throw new IllegalArgumentException("invalid executor type : " + executorType);
-
-        }
-        return requestCommand.convert2Command();
-    }
-
-    /**
-     *  build command
-     * @param context context
-     * @return command
-     */
-    private Command buildKillCommand(ExecutionContext context) {
-        TaskKillRequestCommand requestCommand = new TaskKillRequestCommand();
-        ExecutorType executorType = context.getExecutorType();
-        switch (executorType){
-            case WORKER:
-                TaskExecutionContext taskExecutionContext = context.getContext();
-                requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
-                break;
-            case CLIENT:
-                break;
-            default:
-                throw new IllegalArgumentException("invalid executor type : " + executorType);
-
-        }
-        return requestCommand.convert2Command();
+        doExecute(host, context.getCommand());
     }
 
     /**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index 9d40de9..71bb8f8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.Callable;
 
-import static org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE;
 
 /**
  * master task exec base class
@@ -131,7 +130,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
      */
     private Boolean dispatch(TaskInstance taskInstance){
         TaskExecutionContext context = getTaskExecutionContext(taskInstance);
-        ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER);
+        ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
         try {
             return dispatcher.dispatch(executionContext);
         } catch (ExecuteException e) {
@@ -227,8 +226,8 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
                     }
                 }
                 if(submitDB && !submitTask){
-                    // dispatcht task
-                    submitTask = dispatchtTask(task);
+                    // dispatch task
+                    submitTask = dispatchTask(task);
                 }
                 if(submitDB && submitTask){
                     return task;
@@ -254,7 +253,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
      * @param taskInstance taskInstance
      * @return whether submit task success
      */
-    public Boolean dispatchtTask(TaskInstance taskInstance) {
+    public Boolean dispatchTask(TaskInstance taskInstance) {
 
         try{
             if(taskInstance.isSubProcess()){
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index a196832..fb3f8e9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import com.alibaba.fastjson.JSONObject;
-import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
@@ -184,7 +183,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
         alreadyKilled = true;
 
         TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance);
-        ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER);
+        ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup());
 
         Host host = Host.of(taskInstance.getHost());
         executionContext.setHost(host);