You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/09 02:29:45 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: Refactor worker (#2121)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new fd6f13f  Refactor worker (#2121)
fd6f13f is described below

commit fd6f13fff79664648c141832534d825c55139f63
Author: Tboy <gu...@immomo.com>
AuthorDate: Mon Mar 9 10:29:38 2020 +0800

    Refactor worker (#2121)
    
    * refactor worker registry
    
    * refactor master server
    
    * refactor MasterSchedulerService
---
 .../server/master/MasterServer.java                |  7 ++++++
 .../master/runner/MasterSchedulerService.java      | 26 ++++++++++------------
 2 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 212e5d9..0f3656b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
 import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
+import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
@@ -80,6 +81,9 @@ public class MasterServer {
     @Autowired
     private ZKMasterClient zkMasterClient;
 
+    @Autowired
+    private MasterSchedulerService masterSchedulerService;
+
     /**
      * master server startup
      *
@@ -109,6 +113,8 @@ public class MasterServer {
         //
         this.zkMasterClient.start();
         this.masterRegistry.registry();
+        //
+        masterSchedulerService.start();
 
         // start QuartzExecutors
         // what system should do if exception
@@ -162,6 +168,7 @@ public class MasterServer {
             this.nettyRemotingServer.close();
             this.masterRegistry.unRegistry();
             this.zkMasterClient.close();
+            this.masterSchedulerService.close();
 
             //close quartz
             try{
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index a5598ee..6949ada 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -34,8 +34,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
 import java.util.concurrent.ThreadPoolExecutor;
 
 /**
@@ -50,11 +48,6 @@ public class MasterSchedulerService extends Thread {
     private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
 
     /**
-     * master exec service
-     */
-    private ThreadPoolExecutor masterExecService;
-
-    /**
      * dolphinscheduler database interface
      */
     @Autowired
@@ -66,28 +59,33 @@ public class MasterSchedulerService extends Thread {
     @Autowired
     private ZKMasterClient zkMasterClient;
 
+    @Autowired
+    private MasterConfig masterConfig;
+
     /**
      *  netty remoting client
      */
-    private NettyRemotingClient nettyRemotingClient;
-
+    private final NettyRemotingClient nettyRemotingClient;
 
-    @Autowired
-    private MasterConfig masterConfig;
+    /**
+     * master exec service
+     */
+    private final ThreadPoolExecutor masterExecService;
 
     /**
      * constructor of MasterSchedulerThread
      */
-    @PostConstruct
-    public void init(){
+    public MasterSchedulerService(){
         this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
         NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+    }
+
+    public void start(){
         super.setName("MasterSchedulerThread");
         super.start();
     }
 
-    @PreDestroy
     public void close(){
         nettyRemotingClient.close();
         logger.info("master schedule service stopped...");