You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/03/14 10:10:35 UTC
[incubator-dolphinscheduler] branch dev updated: The master and
worker server exit exception #2163 (#2176)
This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 80c6ce5 The master and worker server exit exception #2163 (#2176)
80c6ce5 is described below
commit 80c6ce5711ab77e104df83ea70853d86916135b7
Author: Rubik-W <39...@users.noreply.github.com>
AuthorDate: Sat Mar 14 18:10:27 2020 +0800
The master and worker server exit exception #2163 (#2176)
* fix: #2163
* fix: format
---
.../common/thread/ThreadUtils.java | 17 ++++++--
.../server/master/MasterServer.java | 33 ++++++++--------
.../server/worker/WorkerServer.java | 45 ++++++++--------------
3 files changed, 46 insertions(+), 49 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 0a4ed9b..d8ef0bb 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -120,12 +120,24 @@ public class ThreadUtils {
/**
* Wrapper over ScheduledThreadPoolExecutor
+ * @param threadName
* @param corePoolSize
* @return
*/
- public static ScheduledExecutorService newDaemonThreadScheduledExecutor(String threadName,int corePoolSize) {
+ public static ScheduledExecutorService newDaemonThreadScheduledExecutor(String threadName, int corePoolSize) {
+ return newThreadScheduledExecutor(threadName, corePoolSize, true);
+ }
+
+ /**
+ * Wrapper over ScheduledThreadPoolExecutor
+ * @param threadName
+ * @param corePoolSize
+ * @param isDaemon
+ * @return
+ */
+ public static ScheduledExecutorService newThreadScheduledExecutor(String threadName, int corePoolSize, boolean isDaemon) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setDaemon(true)
+ .setDaemon(isDaemon)
.setNameFormat(threadName)
.build();
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
@@ -135,7 +147,6 @@ public class ThreadUtils {
return executor;
}
-
public static ThreadInfo getThreadInfo(Thread t) {
long tid = t.getId();
return threadBean.getThreadInfo(tid, STACK_DEPTH);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 6b5063c..9512b1a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
+import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -37,8 +38,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.FilterType;
import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -46,7 +49,9 @@ import java.util.concurrent.TimeUnit;
/**
* master server
*/
-@ComponentScan("org.apache.dolphinscheduler")
+@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
+ @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class})
+})
public class MasterServer implements IStoppable {
/**
@@ -112,7 +117,7 @@ public class MasterServer implements IStoppable {
masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
- heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM);
+ heartbeatMasterService = ThreadUtils.newThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM, false);
// heartbeat thread implement
Runnable heartBeatThread = heartBeatThread();
@@ -147,23 +152,17 @@ public class MasterServer implements IStoppable {
}
logger.error("start Quartz failed", e);
}
-
-
- /**
- * register hooks, which are called before the process exits
- */
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- if (zkMasterClient.getActiveMasterNum() <= 1) {
- zkMasterClient.getAlertDao().sendServerStopedAlert(
- 1, OSUtils.getHost(), "Master-Server");
- }
- stop("shutdownhook");
- }
- }));
}
+ @PreDestroy
+ public void destroy() {
+ // master server exit alert
+ if (zkMasterClient.getActiveMasterNum() <= 1) {
+ zkMasterClient.getAlertDao().sendServerStopedAlert(
+ 1, OSUtils.getHost(), "Master-Server");
+ }
+ stop("shutdownhook");
+ }
/**
* gracefully stop
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index ace9307..86bb7d3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.master.MasterServer;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
@@ -43,10 +44,13 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.FilterType;
import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -56,7 +60,10 @@ import java.util.concurrent.TimeUnit;
/**
* worker server
*/
-@ComponentScan("org.apache.dolphinscheduler")
+@SpringBootApplication
+@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
+ @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {MasterServer.class})
+})
public class WorkerServer implements IStoppable {
/**
@@ -104,11 +111,6 @@ public class WorkerServer implements IStoppable {
*/
private ExecutorService fetchTaskExecutorService;
- /**
- * CountDownLatch latch
- */
- private CountDownLatch latch;
-
@Value("${server.is-combined-server:false}")
private Boolean isCombinedServer;
@@ -149,7 +151,7 @@ public class WorkerServer implements IStoppable {
this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
- heartbeatWorkerService = ThreadUtils.newDaemonThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM);
+ heartbeatWorkerService = ThreadUtils.newThreadScheduledExecutor("Worker-Heartbeat-Thread-Executor", Constants.DEFAUL_WORKER_HEARTBEAT_THREAD_NUM, false);
// heartbeat thread implement
Runnable heartBeatThread = heartBeatThread();
@@ -171,29 +173,15 @@ public class WorkerServer implements IStoppable {
// submit fetch task thread
fetchTaskExecutorService.execute(fetchTaskThread);
+ }
- /**
- * register hooks, which are called before the process exits
- */
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- // worker server exit alert
- if (zkWorkerClient.getActiveMasterNum() <= 1) {
- alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server");
- }
- stop("shutdownhook");
- }
- }));
-
- //let the main thread await
- latch = new CountDownLatch(1);
- if (!isCombinedServer) {
- try {
- latch.await();
- } catch (InterruptedException ignore) {
- }
+ @PreDestroy
+ public void destroy() {
+ // worker server exit alert
+ if (zkWorkerClient.getActiveMasterNum() <= 1) {
+ alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server");
}
+ stop("shutdownhook");
}
@Override
@@ -251,7 +239,6 @@ public class WorkerServer implements IStoppable {
}catch (Exception e){
logger.warn("zookeeper service stopped exception:{}",e.getMessage());
}
- latch.countDown();
logger.info("zookeeper service stopped");
} catch (Exception e) {