You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:25:55 UTC
[dolphinscheduler] 28/29: Fix worker cannot shutdown due to resource close failed or heart beat check failed (#10979)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit f3250bf5fa6c33cacb92bc31cdcb1d734e8cc27b
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Jul 15 20:06:53 2022 +0800
Fix worker cannot shutdown due to resource close failed or heart beat check failed (#10979)
* Use try-with-resource to close resource, and add heart error threshold to avoid worker cannot close due to heart beat check failed
* Move heartbeat error threshold to applicaiton.yml
(cherry picked from commit 2be1d4bf0add2485d4bf322aef7bbaeabfc223de)
---
.../apache/dolphinscheduler/common/Constants.java | 1 +
.../common/thread/ThreadUtils.java | 14 +++++---
.../server/master/MasterServer.java | 36 +++++++++----------
.../server/master/config/MasterConfig.java | 7 ++++
.../master/registry/MasterRegistryClient.java | 8 +++--
.../master/runner/MasterSchedulerBootstrap.java | 3 +-
.../src/main/resources/application.yaml | 2 ++
.../server/registry/HeartBeatTask.java | 19 ++++++++--
.../service/bean/SpringApplicationContext.java | 3 +-
.../apache/dolphinscheduler/StandaloneServer.java | 32 +++++++++++++++--
.../src/main/resources/application.yaml | 4 +++
.../server/worker/WorkerServer.java | 41 +++++++---------------
.../server/worker/config/WorkerConfig.java | 7 ++++
.../worker/registry/WorkerRegistryClient.java | 36 +++++++++++--------
.../src/main/resources/application.yaml | 2 ++
15 files changed, 137 insertions(+), 78 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 5b813b22c7..b4397f007a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -50,6 +50,7 @@ public final class Constants {
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters";
+
public static final String FORMAT_SS = "%s%s";
public static final String FORMAT_S_S = "%s/%s";
public static final String FOLDER_SEPARATOR = "/";
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 5c8020b7cd..f4f2a17bc7 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -21,12 +21,18 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.experimental.UtilityClass;
@UtilityClass
public class ThreadUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(ThreadUtils.class);
+
/**
* Wrapper over newDaemonFixedThreadExecutor.
*
@@ -35,10 +41,7 @@ public class ThreadUtils {
* @return ExecutorService
*/
public static ExecutorService newDaemonFixedThreadExecutor(String threadName, int threadsNum) {
- ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat(threadName)
- .build();
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
return Executors.newFixedThreadPool(threadsNum, threadFactory);
}
@@ -48,8 +51,9 @@ public class ThreadUtils {
public static void sleep(final long millis) {
try {
Thread.sleep(millis);
- } catch (final InterruptedException ignore) {
+ } catch (final InterruptedException interruptedException) {
Thread.currentThread().interrupt();
+ logger.error("Current thread sleep error", interruptedException);
}
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 66b258d5b2..6711ff3289 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -115,31 +115,27 @@ public class MasterServer implements IStoppable {
* @param cause close cause
*/
public void close(String cause) {
-
- try {
- // set stop signal is true
- // execute only once
- if (!Stopper.stop()) {
- logger.warn("MasterServer is already stopped, current cause: {}", cause);
- return;
- }
+ // set stop signal is true
+ // execute only once
+ if (!Stopper.stop()) {
+ logger.warn("MasterServer is already stopped, current cause: {}", cause);
+ return;
+ }
+ // thread sleep 3 seconds for thread quietly stop
+ ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
+ try (MasterSchedulerBootstrap closedSchedulerBootstrap = masterSchedulerBootstrap;
+ MasterRPCServer closedRpcServer = masterRPCServer;
+ MasterRegistryClient closedMasterRegistryClient = masterRegistryClient;
+ // close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
+ // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
+ SpringApplicationContext closedSpringContext = springApplicationContext) {
logger.info("Master server is stopping, current cause : {}", cause);
-
- // thread sleep 3 seconds for thread quietly stop
- ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
- // close
- this.masterSchedulerBootstrap.close();
- this.masterRPCServer.close();
- this.masterRegistryClient.closeRegistry();
- // close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
- // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
- springApplicationContext.close();
-
- logger.info("MasterServer stopped, current cause: {}", cause);
} catch (Exception e) {
logger.error("MasterServer stop failed, current cause: {}", cause, e);
+ return;
}
+ logger.info("MasterServer stopped, current cause: {}", cause);
}
@Override
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index fe51e20984..7f6f124164 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -67,6 +67,10 @@ public class MasterConfig implements Validator {
* Master heart beat task execute interval.
*/
private Duration heartbeatInterval = Duration.ofSeconds(10);
+ /**
+ * Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
+ */
+ private int heartbeatErrorThreshold = 5;
/**
* task submit max retry times.
*/
@@ -129,6 +133,9 @@ public class MasterConfig implements Validator {
if (masterConfig.getMaxCpuLoadAvg() <= 0) {
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
+ if (masterConfig.getHeartbeatErrorThreshold() <= 0) {
+ errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value");
+ }
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 486c360117..a59b712828 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -53,7 +53,7 @@ import com.google.common.collect.Sets;
* <p>When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry.
*/
@Component
-public class MasterRegistryClient {
+public class MasterRegistryClient implements AutoCloseable {
/**
* logger
@@ -108,7 +108,8 @@ public class MasterRegistryClient {
registryClient.setStoppable(stoppable);
}
- public void closeRegistry() {
+ @Override
+ public void close() {
// TODO unsubscribe MasterRegistryDataListener
deregister();
}
@@ -194,7 +195,8 @@ public class MasterRegistryClient {
masterConfig.getReservedMemory(),
Sets.newHashSet(localNodePath),
Constants.MASTER_TYPE,
- registryClient);
+ registryClient,
+ masterConfig.getHeartbeatErrorThreshold());
// remove before persist
registryClient.remove(localNodePath);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index b013a0e9c4..523e5f839d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -57,7 +57,7 @@ import org.springframework.stereotype.Service;
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/
@Service
-public class MasterSchedulerBootstrap extends BaseDaemonThread {
+public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerBootstrap.class);
@@ -112,6 +112,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread {
logger.info("Master schedule bootstrap started...");
}
+ @Override
public void close() {
logger.info("Master schedule bootstrap stopping...");
logger.info("Master schedule bootstrap stopped...");
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml
index ff29b19389..b559806d28 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -98,6 +98,8 @@ master:
host-selector: lower_weight
# master heartbeat interval
heartbeat-interval: 10s
+ # Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
+ heartbeat-error-threshold: 5
# master commit task retry times
task-commit-retry-times: 5
# master commit task interval
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index 129aaff0de..c84abb4182 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,16 +39,22 @@ public class HeartBeatTask implements Runnable {
private final String serverType;
private final HeartBeat heartBeat;
+ private final int heartBeatErrorThreshold;
+
+ private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
+
public HeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
Set<String> heartBeatPaths,
String serverType,
- RegistryClient registryClient) {
+ RegistryClient registryClient,
+ int heartBeatErrorThreshold) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
+ this.heartBeatErrorThreshold = heartBeatErrorThreshold;
}
public HeartBeatTask(long startupTime,
@@ -58,13 +65,14 @@ public class HeartBeatTask implements Runnable {
String serverType,
RegistryClient registryClient,
int workerThreadCount,
- int workerWaitingTaskCount
- ) {
+ int workerWaitingTaskCount,
+ int heartBeatErrorThreshold) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.workerWaitingTaskCount = workerWaitingTaskCount;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount);
+ this.heartBeatErrorThreshold = heartBeatErrorThreshold;
}
public String getHeartBeatInfo() {
@@ -88,8 +96,13 @@ public class HeartBeatTask implements Runnable {
for (String heartBeatPath : heartBeatPaths) {
registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
}
+ heartBeatErrorTimes.set(0);
} catch (Throwable ex) {
logger.error("HeartBeat task execute failed", ex);
+ if (heartBeatErrorTimes.incrementAndGet() >= heartBeatErrorThreshold) {
+ registryClient.getStoppable()
+ .stop("HeartBeat task connect to zk failed too much times: " + heartBeatErrorTimes);
+ }
}
}
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
index 61dfcb35d7..5b37b1f72d 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
@@ -24,7 +24,7 @@ import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.stereotype.Component;
@Component
-public class SpringApplicationContext implements ApplicationContextAware {
+public class SpringApplicationContext implements ApplicationContextAware, AutoCloseable {
private static ApplicationContext applicationContext;
@@ -36,6 +36,7 @@ public class SpringApplicationContext implements ApplicationContextAware {
/**
* Close this application context, destroying all beans in its bean factory.
*/
+ @Override
public void close() {
((AbstractApplicationContext)applicationContext).close();
}
diff --git a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
index 3866c53066..728fed8eaa 100644
--- a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
+++ b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
@@ -19,15 +19,41 @@ package org.apache.dolphinscheduler;
import org.apache.curator.test.TestingServer;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.event.ApplicationFailedEvent;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextClosedEvent;
+
+import lombok.NonNull;
@SpringBootApplication
-public class StandaloneServer {
+public class StandaloneServer implements ApplicationListener<ApplicationEvent> {
+
+ private static final Logger logger = LoggerFactory.getLogger(StandaloneServer.class);
+
+ private static TestingServer zookeeperServer;
public static void main(String[] args) throws Exception {
- final TestingServer server = new TestingServer(true);
- System.setProperty("registry.zookeeper.connect-string", server.getConnectString());
+ zookeeperServer = new TestingServer(true);
+ System.setProperty("registry.zookeeper.connect-string", zookeeperServer.getConnectString());
SpringApplication.run(StandaloneServer.class, args);
}
+
+ @Override
+ public void onApplicationEvent(@NonNull ApplicationEvent event) {
+ if (event instanceof ApplicationFailedEvent || event instanceof ContextClosedEvent) {
+ try (TestingServer closedServer = zookeeperServer) {
+ // close the zookeeper server
+ logger.info("Receive spring context close event: {}, will closed zookeeper server", event);
+ } catch (IOException e) {
+ logger.error("Close zookeeper server error", e);
+ }
+ }
+ }
}
diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 9a940b6c0e..8411360275 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -116,6 +116,8 @@ master:
host-selector: lower_weight
# master heartbeat interval
heartbeat-interval: 10s
+ # Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
+ heartbeat-error-threshold: 5
# master commit task retry times
task-commit-retry-times: 5
# master commit task interval
@@ -137,6 +139,8 @@ worker:
exec-threads: 10
# worker heartbeat interval
heartbeat-interval: 10s
+ # Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
+ heartbeat-error-threshold: 5
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# worker tenant auto create
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 0321b76f44..471bc008eb 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
-import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Collection;
-import java.util.Set;
import javax.annotation.PostConstruct;
@@ -111,8 +110,7 @@ public class WorkerServer implements IStoppable {
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
- Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
- this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
+ this.workerRegistryClient.handleDeadServer();
this.workerManagerThread.start();
@@ -129,37 +127,24 @@ public class WorkerServer implements IStoppable {
}
public void close(String cause) {
- try {
- // execute only once
- // set stop signal is true
- if (!Stopper.stop()) {
- logger.warn("WorkerServer is already stopped, current cause: {}", cause);
- return;
- }
+ if (!Stopper.stop()) {
+ logger.warn("WorkerServer is already stopped, current cause: {}", cause);
+ return;
+ }
+ ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
+ try (WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
+ WorkerRegistryClient closedRegistryClient = workerRegistryClient;
+ AlertClientService closedAlertClientService = alertClientService;
+ SpringApplicationContext closedSpringContext = springApplicationContext;) {
logger.info("Worker server is stopping, current cause : {}", cause);
-
- try {
- // thread sleep 3 seconds for thread quitely stop
- Thread.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
- } catch (Exception e) {
- logger.warn("Worker server close wait error", e);
- }
-
- // close
- this.workerRpcServer.close();
- this.workerRegistryClient.unRegistry();
- this.alertClientService.close();
-
// kill running tasks
this.killAllRunningTasks();
-
- // close the application context
- this.springApplicationContext.close();
- logger.info("Worker server stopped, current cause: {}", cause);
} catch (Exception e) {
logger.error("Worker server stop failed, current cause: {}", cause, e);
+ return;
}
+ logger.info("Worker server stopped, current cause: {}", cause);
}
@Override
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index aab162d6d4..2367975715 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -40,6 +40,10 @@ public class WorkerConfig implements Validator {
private int listenPort = 1234;
private int execThreads = 10;
private Duration heartbeatInterval = Duration.ofSeconds(10);
+ /**
+ * Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
+ */
+ private int heartbeatErrorThreshold = 5;
private int hostWeight = 100;
private boolean tenantAutoCreate = true;
private boolean tenantDistributedUser = false;
@@ -70,6 +74,9 @@ public class WorkerConfig implements Validator {
if (workerConfig.getMaxCpuLoadAvg() <= 0) {
workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
+ if (workerConfig.getHeartbeatErrorThreshold() <= 0) {
+ errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value");
+ }
workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index b33b3ef0d7..99d18acc30 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -55,7 +55,7 @@ import com.google.common.collect.Sets;
* worker registry
*/
@Service
-public class WorkerRegistryClient {
+public class WorkerRegistryClient implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistryClient.class);
@@ -102,15 +102,15 @@ public class WorkerRegistryClient {
long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
- workerConfig.getMaxCpuLoadAvg(),
- workerConfig.getReservedMemory(),
- workerConfig.getHostWeight(),
- workerZkPaths,
- Constants.WORKER_TYPE,
- registryClient,
- workerConfig.getExecThreads(),
- workerManagerThread.getThreadPoolQueueSize()
- );
+ workerConfig.getMaxCpuLoadAvg(),
+ workerConfig.getReservedMemory(),
+ workerConfig.getHostWeight(),
+ workerZkPaths,
+ Constants.WORKER_TYPE,
+ registryClient,
+ workerConfig.getExecThreads(),
+ workerManagerThread.getThreadPoolQueueSize(),
+ workerConfig.getHeartbeatErrorThreshold());
for (String workerZKPath : workerZkPaths) {
// remove before persist
@@ -148,8 +148,10 @@ public class WorkerRegistryClient {
logger.error("remove worker zk path exception", ex);
}
- this.heartBeatExecutor.shutdownNow();
- logger.info("heartbeat executor shutdown");
+ if (heartBeatExecutor != null) {
+ heartBeatExecutor.shutdownNow();
+ logger.info("Heartbeat executor shutdown");
+ }
registryClient.close();
logger.info("registry client closed");
@@ -176,8 +178,9 @@ public class WorkerRegistryClient {
return workerPaths;
}
- public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) {
- registryClient.handleDeadServer(nodeSet, nodeType, opType);
+ public void handleDeadServer() {
+ Set<String> workerZkPaths = getWorkerZkPaths();
+ registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
}
/**
@@ -191,4 +194,9 @@ public class WorkerRegistryClient {
registryClient.setStoppable(stoppable);
}
+ @Override
+ public void close() throws IOException {
+ unRegistry();
+ }
+
}
diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml
index d764f9d25b..7d653edc24 100644
--- a/dolphinscheduler-worker/src/main/resources/application.yaml
+++ b/dolphinscheduler-worker/src/main/resources/application.yaml
@@ -60,6 +60,8 @@ worker:
exec-threads: 100
# worker heartbeat interval
heartbeat-interval: 10s
+ # Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
+ heartbeat-error-threshold: 5
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# worker tenant auto create