You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/03/14 10:10:35 UTC

[incubator-dolphinscheduler] branch dev updated: The master and worker server exit exception #2163 (#2176)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 80c6ce5  The master and worker server exit exception #2163 (#2176)
80c6ce5 is described below

commit 80c6ce5711ab77e104df83ea70853d86916135b7
Author: Rubik-W <39...@users.noreply.github.com>
AuthorDate: Sat Mar 14 18:10:27 2020 +0800

    The master and worker server exit exception #2163 (#2176)
    
    * fix: #2163
    
    * fix: format
---
 .../common/thread/ThreadUtils.java                 | 17 ++++++--
 .../server/master/MasterServer.java                | 33 ++++++++--------
 .../server/worker/WorkerServer.java                | 45 ++++++++--------------
 3 files changed, 46 insertions(+), 49 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 0a4ed9b..d8ef0bb 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -120,12 +120,24 @@ public class ThreadUtils {
 
     /**
      * Wrapper over ScheduledThreadPoolExecutor
+     * @param threadName
      * @param corePoolSize
      * @return
      */
-    public static ScheduledExecutorService newDaemonThreadScheduledExecutor(String threadName,int corePoolSize) {
+    public static ScheduledExecutorService newDaemonThreadScheduledExecutor(String threadName, int corePoolSize) {
+        return newThreadScheduledExecutor(threadName, corePoolSize, true);
+    }
+
+    /**
+     * Wrapper over ScheduledThreadPoolExecutor
+     * @param threadName
+     * @param corePoolSize
+     * @param isDaemon
+     * @return
+     */
+    public static ScheduledExecutorService newThreadScheduledExecutor(String threadName, int corePoolSize, boolean isDaemon) {
         ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                .setDaemon(true)
+                .setDaemon(isDaemon)
                 .setNameFormat(threadName)
                 .build();
         ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
@@ -135,7 +147,6 @@ public class ThreadUtils {
         return executor;
     }
 
-
     public static ThreadInfo getThreadInfo(Thread t) {
         long tid = t.getId();
         return threadBean.getThreadInfo(tid, STACK_DEPTH);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 6b5063c..9512b1a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
+import org.apache.dolphinscheduler.server.worker.WorkerServer;
 import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -37,8 +38,10 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.WebApplicationType;
 import org.springframework.boot.builder.SpringApplicationBuilder;
 import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.FilterType;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -46,7 +49,9 @@ import java.util.concurrent.TimeUnit;
 /**
  * master server
  */
-@ComponentScan("org.apache.dolphinscheduler")
+@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
+        @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class})
+})
 public class MasterServer implements IStoppable {
 
     /**
@@ -112,7 +117,7 @@ public class MasterServer implements IStoppable {
 
         masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
 
-        heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM);
+        heartbeatMasterService = ThreadUtils.newThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM, false);
 
         // heartbeat thread implement
         Runnable heartBeatThread = heartBeatThread();
@@ -147,23 +152,17 @@ public class MasterServer implements IStoppable {
             }
             logger.error("start Quartz failed", e);
         }
-
-
-        /**
-         *  register hooks, which are called before the process exits
-         */
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                if (zkMasterClient.getActiveMasterNum() <= 1) {
-                    zkMasterClient.getAlertDao().sendServerStopedAlert(
-                        1, OSUtils.getHost(), "Master-Server");
-                }
-                stop("shutdownhook");
-            }
-        }));
     }
 
+    @PreDestroy
+    public void destroy() {
+        // master server exit alert
+        if (zkMasterClient.getActiveMasterNum() <= 1) {
+            zkMasterClient.getAlertDao().sendServerStopedAlert(
+                    1, OSUtils.getHost(), "Master-Server");
+        }
+        stop("shutdownhook");
+    }
 
     /**
      * gracefully stop
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index ace9307..86bb7d3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.master.MasterServer;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
@@ -43,10 +44,13 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.builder.SpringApplicationBuilder;
 import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.FilterType;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -56,7 +60,10 @@ import java.util.concurrent.TimeUnit;
 /**
  *  worker server
  */
-@ComponentScan("org.apache.dolphinscheduler")
+@SpringBootApplication
+@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
+        @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {MasterServer.class})
+})
 public class WorkerServer implements IStoppable {
 
     /**
@@ -104,11 +111,6 @@ public class WorkerServer implements IStoppable {
      */
     private ExecutorService fetchTaskExecutorService;
 
-    /**
-     * CountDownLatch latch
-     */
-    private CountDownLatch latch;
-
     @Value("${server.is-combined-server:false}")
     private Boolean isCombinedServer;
 
@@ -149,7 +151,7 @@ public class WorkerServer implements IStoppable {
 
         this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
 
-        heartbeatWorkerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM);
+        heartbeatWorkerService = ThreadUtils.newThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM, false);
 
         // heartbeat thread implement
         Runnable heartBeatThread = heartBeatThread();
@@ -171,29 +173,15 @@ public class WorkerServer implements IStoppable {
 
         // submit fetch task thread
         fetchTaskExecutorService.execute(fetchTaskThread);
+    }
 
-        /**
-         * register hooks, which are called before the process exits
-         */
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                // worker server exit alert
-                if (zkWorkerClient.getActiveMasterNum() <= 1) {
-                    alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server");
-                }
-                stop("shutdownhook");
-            }
-        }));
-
-        //let the main thread await
-        latch = new CountDownLatch(1);
-        if (!isCombinedServer) {
-            try {
-                latch.await();
-            } catch (InterruptedException ignore) {
-            }
+    @PreDestroy
+    public void destroy() {
+        // worker server exit alert
+        if (zkWorkerClient.getActiveMasterNum() <= 1) {
+            alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server");
         }
+        stop("shutdownhook");
     }
 
     @Override
@@ -251,7 +239,6 @@ public class WorkerServer implements IStoppable {
             }catch (Exception e){
                 logger.warn("zookeeper service stopped exception:{}",e.getMessage());
             }
-            latch.countDown();
             logger.info("zookeeper service stopped");
 
         } catch (Exception e) {