You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/02/23 07:36:44 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: Refactor worker (#2000)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new c7be43c  Refactor worker (#2000)
c7be43c is described below

commit c7be43cbd4e276730a4e3b04ce086fe85dd10f86
Author: Tboy <gu...@immomo.com>
AuthorDate: Sun Feb 23 15:36:37 2020 +0800

    Refactor worker (#2000)
    
    * Refactor worker (#2)
    
    * Refactor worker (#1993)
    
    * Refactor worker (#1)
    
    * add TaskResponseProcessor (#1983)
    
    * 1, master persistent task 2. extract  master and worker communication model (#1992)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * updates
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * TaskExecutionContext create modify (#1994)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
    
    * updates
    
    * add- register processor
    
    Co-authored-by: qiaozhanwei <qi...@outlook.com>
---
 .../java/org/apache/dolphinscheduler/server/master/MasterServer.java | 4 ++--
 .../server/master/dispatch/executor/NettyExecutorManager.java        | 5 +++++
 .../dolphinscheduler/server/master/processor/TaskAckProcessor.java   | 5 +++--
 .../server/master/processor/TaskResponseProcessor.java               | 5 +++--
 4 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 9493b72..7829347 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -120,8 +120,8 @@ public class MasterServer implements IStoppable {
         NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(45678);
         this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
-        this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor(processService));
-        this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor(processService));
+        this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
+        this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
         this.nettyRemotingServer.start();
 
         //
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index dac8d79..e24bbe7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
@@ -28,6 +29,8 @@ import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
 import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
 import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,6 +56,8 @@ public class NettyExecutorManager extends AbstractExecutorManager{
     public NettyExecutorManager(){
         final NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+        this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
+        this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
     }
 
     @Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index 1103b23..f5f2123 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,8 +42,8 @@ public class TaskAckProcessor implements NettyRequestProcessor {
      */
     private final ProcessService processService;
 
-    public TaskAckProcessor(ProcessService processService){
-        this.processService = processService;
+    public TaskAckProcessor(){
+        this.processService = SpringApplicationContext.getBean(ProcessService.class);
     }
 
     @Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index b62bb77..bbc710c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,8 +42,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
      */
     private final ProcessService processService;
 
-    public TaskResponseProcessor(ProcessService processService){
-        this.processService = processService;
+    public TaskResponseProcessor(){
+        this.processService = SpringApplicationContext.getBean(ProcessService.class);
     }
 
     /**