You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/02/25 06:01:04 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: ExecutorManager interface add generic type (#2012)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 33bf550  ExecutorManager interface add generic type (#2012)
33bf550 is described below

commit 33bf550a33caa47b701b43a787e362dc7a4601f4
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Tue Feb 25 14:00:59 2020 +0800

    ExecutorManager interface add generic type (#2012)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
---
 .../server/master/dispatch/ExecutorDispatcher.java |  8 ++++----
 .../dispatch/executor/AbstractExecutorManager.java |  2 +-
 .../master/dispatch/executor/ExecutorManager.java  |  4 ++--
 .../dispatch/executor/NettyExecutorManager.java    | 12 +++++++++---
 .../master/runner/MasterBaseTaskExecThread.java    | 22 +++++-----------------
 5 files changed, 21 insertions(+), 27 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index 01fb840..8a803a2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -50,7 +50,7 @@ public class ExecutorDispatcher implements InitializingBean {
     @Autowired
     private RoundRobinHostManager hostManager;
 
-    private final ConcurrentHashMap<ExecutorType, ExecutorManager> executorManagers;
+    private final ConcurrentHashMap<ExecutorType, ExecutorManager<Boolean>> executorManagers;
 
     public ExecutorDispatcher(){
         this.executorManagers = new ConcurrentHashMap<>();
@@ -61,11 +61,11 @@ public class ExecutorDispatcher implements InitializingBean {
      * @param context context
      * @throws ExecuteException
      */
-    public void dispatch(final ExecutionContext context) throws ExecuteException {
+    public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
         /**
          * get executor manager
          */
-        ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType());
+        ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
         if(executorManager == null){
             throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
         }
@@ -83,7 +83,7 @@ public class ExecutorDispatcher implements InitializingBean {
             /**
              * task execute
              */
-            executorManager.execute(context);
+            return executorManager.execute(context);
         } finally {
             executorManager.afterExecute(context);
         }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
index e1f0c3c..c0be5a8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
@@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
 /**
  *  abstract executor manager
  */
-public abstract class AbstractExecutorManager implements ExecutorManager{
+public abstract class AbstractExecutorManager<T> implements ExecutorManager<T>{
 
     /**
      * before execute , add time monitor , timeout
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
index 1d78d2f..9b0b9af 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
@@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
 /**
  *  executor manager
  */
-public interface ExecutorManager {
+public interface ExecutorManager<T> {
 
     /**
      * before execute
@@ -37,7 +37,7 @@ public interface ExecutorManager {
      * @param context context
      * @throws ExecuteException
      */
-    void execute(ExecutionContext context) throws ExecuteException;
+    T execute(ExecutionContext context) throws ExecuteException;
 
     /**
      * after execute
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index cf1a264..bdfe71c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -46,7 +46,7 @@ import java.util.Set;
  *  netty executor manager
  */
 @Service
-public class NettyExecutorManager extends AbstractExecutorManager{
+public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
 
     private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class);
 
@@ -64,6 +64,10 @@ public class NettyExecutorManager extends AbstractExecutorManager{
     public NettyExecutorManager(){
         final NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+        /**
+         * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
+         * register EXECUTE_TASK_ACK command type TaskAckProcessor
+         */
         this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
         this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
     }
@@ -74,7 +78,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{
      * @throws ExecuteException
      */
     @Override
-    public void execute(ExecutionContext context) throws ExecuteException {
+    public Boolean execute(ExecutionContext context) throws ExecuteException {
 
         /**
          *  all nodes
@@ -118,6 +122,8 @@ public class NettyExecutorManager extends AbstractExecutorManager{
                 }
             }
         }
+
+        return success;
     }
 
     /**
@@ -189,7 +195,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{
                 break;
             case CLIENT:
                 break;
-             default:
+            default:
                 throw new IllegalArgumentException("invalid executor type : " + executorType);
 
         }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index f675493..9bf69dd 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -20,22 +20,10 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.dao.utils.BeanContext;
-import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
 import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
-import org.apache.dolphinscheduler.remote.future.InvokeCallback;
-import org.apache.dolphinscheduler.remote.future.ResponseFuture;
-import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
@@ -48,7 +36,6 @@ import org.apache.dolphinscheduler.service.queue.ITaskQueue;
 import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.concurrent.Callable;
 
@@ -138,14 +125,16 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
      * dispatch task to worker
      * @param taskInstance
      */
-    public void dispatch(TaskInstance taskInstance){
+    private Boolean dispatch(TaskInstance taskInstance){
         TaskExecutionContext context = getTaskExecutionContext(taskInstance);
         ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER);
         try {
-            dispatcher.dispatch(executionContext);
+            return dispatcher.dispatch(executionContext);
         } catch (ExecuteException e) {
             logger.error("execute exception", e);
+            return false;
         }
+
     }
 
     /**
@@ -234,8 +223,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
                 }
                 if(submitDB && !submitQueue){
                     // submit task to queue
-                    dispatch(task);
-                    submitQueue = true;
+                    submitQueue = dispatch(task);
                 }
                 if(submitDB && submitQueue){
                     return task;