You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/25 15:22:44 UTC
[dolphinscheduler] 02/02: [fix#12000]Cannot remove the WorkerGroup from the master service (#12050)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 3e899bee0613e5cb4787feb53c3465f6220b25f6
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Sat Sep 24 18:57:20 2022 +0800
[fix#12000]Cannot remove the WorkerGroup from the master service (#12050)
* [Bug] [Master] Cannot remove the WorkerGroup from the master service. #12000
* remove unnecessary locks
* Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
Co-authored-by: caishunfeng <ca...@gmail.com>
Co-authored-by: caishunfeng <ca...@gmail.com>
---
.../server/master/registry/ServerNodeManager.java | 167 +++++++++++++--------
1 file changed, 106 insertions(+), 61 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 147fff1364..ef9111cae7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -17,8 +17,9 @@
package org.apache.dolphinscheduler.server.master.registry;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
@@ -34,14 +35,12 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import javax.annotation.PreDestroy;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -57,8 +56,13 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+import javax.annotation.PreDestroy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
/**
* server node manager
@@ -74,16 +78,25 @@ public class ServerNodeManager implements InitializingBean {
private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = workerGroupLock.readLock();
private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = workerGroupLock.writeLock();
-
private final ReentrantReadWriteLock workerNodeInfoLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = workerNodeInfoLock.readLock();
private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock();
/**
- * worker group nodes, workerGroup -> ips
+ * worker group nodes, workerGroup -> ips, combining registryWorkerGroupNodes and dbWorkerGroupNodes
*/
private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
+ /**
+ * worker group nodes from registry center, workerGroup -> ips
+ */
+ private final ConcurrentHashMap<String, Set<String>> registryWorkerGroupNodes = new ConcurrentHashMap<>();
+
+ /**
+ * worker group nodes from db, workerGroup -> ips
+ */
+ private final ConcurrentHashMap<String, Set<String>> dbWorkerGroupNodes = new ConcurrentHashMap<>();
+
/**
* master nodes
*/
@@ -138,7 +151,6 @@ public class ServerNodeManager implements InitializingBean {
return MASTER_SIZE;
}
-
/**
* init listener
*
@@ -146,22 +158,20 @@ public class ServerNodeManager implements InitializingBean {
*/
@Override
public void afterPropertiesSet() throws Exception {
- /**
- * load nodes from zookeeper
- */
+
+ // load nodes from zookeeper
load();
- /**
- * init executor service
- */
- executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
+
+ // init executor service
+ executorService =
+ Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
+
executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS);
- /*
- * init MasterNodeListener listener
- */
+
+ // init MasterNodeListener listener
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
- /*
- * init WorkerNodeListener listener
- */
+
+ // init WorkerNodeListener listener
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener());
}
@@ -169,17 +179,14 @@ public class ServerNodeManager implements InitializingBean {
* load nodes from zookeeper
*/
public void load() {
- /*
- * master nodes from zookeeper
- */
+ // master nodes from zookeeper
updateMasterNodes();
- /*
- * worker group nodes from zookeeper
- */
+ // worker group nodes from zookeeper
Collection<String> workerGroups = registryClient.getWorkerGroupDirectly();
for (String workerGroup : workerGroups) {
- syncWorkerGroupNodes(workerGroup, registryClient.getWorkerGroupNodesDirectly(workerGroup));
+ syncWorkerGroupNodesFromRegistry(workerGroup,
+ registryClient.getWorkerGroupNodesDirectly(workerGroup), Type.ADD);
}
}
@@ -191,29 +198,38 @@ public class ServerNodeManager implements InitializingBean {
@Override
public void run() {
try {
+ dbWorkerGroupNodes.clear();
+
// sync worker node info
- Map<String, String> registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true);
+ Map<String, String> registryWorkerNodeMap = registryClient
+ .getServerMaps(NodeType.WORKER, true);
syncAllWorkerNodeInfo(registryWorkerNodeMap);
// sync worker group nodes from database
List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
if (CollectionUtils.isNotEmpty(workerGroupList)) {
for (WorkerGroup wg : workerGroupList) {
String workerGroupName = wg.getName();
- Set<String> workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg);
+ Set<String> workerAddress = getWorkerAddressByWorkerGroup(
+ registryWorkerNodeMap, wg);
if (!workerAddress.isEmpty()) {
- syncWorkerGroupNodes(workerGroupName, workerAddress);
+ Set<String> workerNodes = dbWorkerGroupNodes
+ .getOrDefault(workerGroupName, new HashSet<>());
+ workerNodes.clear();
+ workerNodes.addAll(workerAddress);
+ dbWorkerGroupNodes.put(workerGroupName, workerNodes);
}
}
}
- notifyWorkerInfoChangeListeners();
} catch (Exception e) {
logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
+ } finally {
+ refreshWorkerGroupNodes();
}
}
}
-
- protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) {
+ protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo,
+ WorkerGroup wg) {
Set<String> nodes = new HashSet<>();
String[] addrs = wg.getAddrList().split(Constants.COMMA);
for (String addr : addrs) {
@@ -238,29 +254,27 @@ public class ServerNodeManager implements InitializingBean {
try {
String[] parts = path.split("/");
if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
- throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
+ throw new IllegalArgumentException(
+ String.format("worker group path : %s is not valid, ignore", path));
}
final String workerGroupName = parts[parts.length - 2];
final String workerAddress = parts[parts.length - 1];
+ logger.debug("received subscribe event : {}", event);
+ Collection<String> currentNodes = registryClient
+ .getWorkerGroupNodesDirectly(workerGroupName);
+ syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type);
+
if (type == Type.ADD) {
- logger.info("worker group node : {} added.", path);
- Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
- logger.info("currentNodes : {}", currentNodes);
- syncWorkerGroupNodes(workerGroupName, currentNodes);
+ logger.info("worker group node : {} added, currentNodes : {}", path,
+ currentNodes);
} else if (type == Type.REMOVE) {
logger.info("worker group node : {} down.", path);
- Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
- syncWorkerGroupNodes(workerGroupName, currentNodes);
alertDao.sendServerStoppedAlert(1, path, "WORKER");
} else if (type == Type.UPDATE) {
- logger.debug("worker group node : {} update, data: {}", path, data);
- Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
- syncWorkerGroupNodes(workerGroupName, currentNodes);
-
- syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
+ syncSingleWorkerNodeInfo(workerAddress,
+ JSONUtils.parseObject(data, WorkerHeartBeat.class));
}
- notifyWorkerInfoChangeListeners();
} catch (IllegalArgumentException ex) {
logger.warn(ex.getMessage());
} catch (Exception ex) {
@@ -272,6 +286,7 @@ public class ServerNodeManager implements InitializingBean {
}
class MasterDataListener implements SubscribeListener {
+
@Override
public void notify(Event event) {
final String path = event.path();
@@ -328,28 +343,57 @@ public class ServerNodeManager implements InitializingBean {
MASTER_SIZE = nodes.size();
MASTER_SLOT = index;
} else {
- logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress());
+ logger.warn("current addr:{} is not in active master list",
+ masterConfig.getMasterAddress());
}
- logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, masterConfig.getMasterAddress());
+ logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE,
+ MASTER_SLOT, masterConfig.getMasterAddress());
} finally {
masterLock.unlock();
}
}
/**
- * sync worker group nodes
+ * sync worker group nodes from registry center
*
* @param workerGroup worker group
* @param nodes worker nodes
+ * @param type event type
+ */
+ private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection<String> nodes,
+ Type type) {
+ try {
+ if (type == Type.REMOVE) {
+ if (!registryWorkerGroupNodes.containsKey(workerGroup)) {
+ logger.warn("cannot remove worker group {}, not in active list", workerGroup);
+ return;
+ }
+ registryWorkerGroupNodes.remove(workerGroup);
+ } else {
+ Set<String> workerNodes = registryWorkerGroupNodes
+ .getOrDefault(workerGroup, new HashSet<>());
+ workerNodes.clear();
+ workerNodes.addAll(nodes);
+ registryWorkerGroupNodes.put(workerGroup, workerNodes);
+ }
+ } finally {
+ refreshWorkerGroupNodes();
+ }
+ }
+
+ /**
+ * refresh worker group nodes
*/
- private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) {
+ private void refreshWorkerGroupNodes() {
workerGroupWriteLock.lock();
try {
- Set<String> workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>());
- workerNodes.clear();
- workerNodes.addAll(nodes);
- workerGroupNodes.put(workerGroup, workerNodes);
+ workerGroupNodes.clear();
+ workerGroupNodes.putAll(registryWorkerGroupNodes);
+ workerGroupNodes.putAll(dbWorkerGroupNodes);
+ logger.debug("refresh worker group nodes, current list: {}",
+ Arrays.toString(workerGroupNodes.keySet().toArray()));
} finally {
+ notifyWorkerInfoChangeListeners();
workerGroupWriteLock.unlock();
}
}
@@ -414,7 +458,8 @@ public class ServerNodeManager implements InitializingBean {
try {
workerNodeInfo.clear();
for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
- workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
+ workerNodeInfo.put(entry.getKey(),
+ JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
}
} finally {
workerNodeInfoWriteLock.unlock();