You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/24 10:57:36 UTC
[dolphinscheduler] branch dev updated: [fix#12000]Cannot remove the WorkerGroup from the master service (#12050)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ada7cf71d5 [fix#12000]Cannot remove the WorkerGroup from the master service (#12050)
ada7cf71d5 is described below
commit ada7cf71d5c70174c7d7c6608f4f60896db5d6b9
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Sat Sep 24 18:57:20 2022 +0800
[fix#12000]Cannot remove the WorkerGroup from the master service (#12050)
* [Bug] [Master] Cannot remove the WorkerGroup from the master service. #12000
* remove unnecessary locks
* Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
Co-authored-by: caishunfeng <ca...@gmail.com>
Co-authored-by: caishunfeng <ca...@gmail.com>
---
.../server/master/registry/ServerNodeManager.java | 141 +++++++++++++--------
1 file changed, 91 insertions(+), 50 deletions(-)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 3ec10b3ed7..19c42c0711 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+import java.util.Arrays;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
@@ -82,10 +83,20 @@ public class ServerNodeManager implements InitializingBean {
private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock();
/**
- * worker group nodes, workerGroup -> ips
+ * worker group nodes, workerGroup -> ips, combining registryWorkerGroupNodes and dbWorkerGroupNodes
*/
private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
+ /**
+ * worker group nodes from registry center, workerGroup -> ips
+ */
+ private final ConcurrentHashMap<String, Set<String>> registryWorkerGroupNodes = new ConcurrentHashMap<>();
+
+ /**
+ * worker group nodes from db, workerGroup -> ips
+ */
+ private final ConcurrentHashMap<String, Set<String>> dbWorkerGroupNodes = new ConcurrentHashMap<>();
+
/**
* master nodes
*/
@@ -140,6 +151,7 @@ public class ServerNodeManager implements InitializingBean {
return MASTER_SIZE;
}
+
/**
* init listener
*
@@ -147,23 +159,19 @@ public class ServerNodeManager implements InitializingBean {
*/
@Override
public void afterPropertiesSet() throws Exception {
- /**
- * load nodes from zookeeper
- */
+
+ // load nodes from zookeeper
load();
- /**
- * init executor service
- */
+
+ // init executor service
executorService =
- Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
+ Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS);
- /*
- * init MasterNodeListener listener
- */
+
+ // init MasterNodeListener listener
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
- /*
- * init WorkerNodeListener listener
- */
+
+ // init WorkerNodeListener listener
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener());
}
@@ -171,17 +179,14 @@ public class ServerNodeManager implements InitializingBean {
* load nodes from zookeeper
*/
public void load() {
- /*
- * master nodes from zookeeper
- */
+ // master nodes from zookeeper
updateMasterNodes();
- /*
- * worker group nodes from zookeeper
- */
+ // worker group nodes from zookeeper
Collection<String> workerGroups = registryClient.getWorkerGroupDirectly();
for (String workerGroup : workerGroups) {
- syncWorkerGroupNodes(workerGroup, registryClient.getWorkerGroupNodesDirectly(workerGroup));
+ syncWorkerGroupNodesFromRegistry(workerGroup,
+ registryClient.getWorkerGroupNodesDirectly(workerGroup), Type.ADD);
}
}
@@ -193,28 +198,39 @@ public class ServerNodeManager implements InitializingBean {
@Override
public void run() {
try {
+ dbWorkerGroupNodes.clear();
+
// sync worker node info
- Map<String, String> registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true);
+ Map<String, String> registryWorkerNodeMap = registryClient
+ .getServerMaps(NodeType.WORKER, true);
syncAllWorkerNodeInfo(registryWorkerNodeMap);
// sync worker group nodes from database
List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
if (CollectionUtils.isNotEmpty(workerGroupList)) {
for (WorkerGroup wg : workerGroupList) {
String workerGroupName = wg.getName();
- Set<String> workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg);
+ Set<String> workerAddress = getWorkerAddressByWorkerGroup(
+ registryWorkerNodeMap, wg);
if (!workerAddress.isEmpty()) {
- syncWorkerGroupNodes(workerGroupName, workerAddress);
+ Set<String> workerNodes = dbWorkerGroupNodes
+ .getOrDefault(workerGroupName, new HashSet<>());
+ workerNodes.clear();
+ workerNodes.addAll(workerAddress);
+ dbWorkerGroupNodes.put(workerGroupName, workerNodes);
}
}
}
- notifyWorkerInfoChangeListeners();
} catch (Exception e) {
logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
+ } finally {
+ refreshWorkerGroupNodes();
}
}
}
- protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) {
+
+ protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo,
+ WorkerGroup wg) {
Set<String> nodes = new HashSet<>();
String[] addrs = wg.getAddrList().split(Constants.COMMA);
for (String addr : addrs) {
@@ -240,29 +256,26 @@ public class ServerNodeManager implements InitializingBean {
String[] parts = path.split("/");
if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
throw new IllegalArgumentException(
- String.format("worker group path : %s is not valid, ignore", path));
+ String.format("worker group path : %s is not valid, ignore", path));
}
final String workerGroupName = parts[parts.length - 2];
final String workerAddress = parts[parts.length - 1];
+ logger.debug("received subscribe event : {}", event);
+ Collection<String> currentNodes = registryClient
+ .getWorkerGroupNodesDirectly(workerGroupName);
+ syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type);
+
if (type == Type.ADD) {
- logger.info("worker group node : {} added.", path);
- Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
- logger.info("currentNodes : {}", currentNodes);
- syncWorkerGroupNodes(workerGroupName, currentNodes);
+ logger.info("worker group node : {} added, currentNodes : {}", path,
+ currentNodes);
} else if (type == Type.REMOVE) {
logger.info("worker group node : {} down.", path);
- Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
- syncWorkerGroupNodes(workerGroupName, currentNodes);
alertDao.sendServerStoppedAlert(1, path, "WORKER");
} else if (type == Type.UPDATE) {
- logger.debug("worker group node : {} update, data: {}", path, data);
- Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
- syncWorkerGroupNodes(workerGroupName, currentNodes);
-
- syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
+ syncSingleWorkerNodeInfo(workerAddress,
+ JSONUtils.parseObject(data, WorkerHeartBeat.class));
}
- notifyWorkerInfoChangeListeners();
} catch (IllegalArgumentException ex) {
logger.warn(ex.getMessage());
} catch (Exception ex) {
@@ -274,7 +287,6 @@ public class ServerNodeManager implements InitializingBean {
}
class MasterDataListener implements SubscribeListener {
-
@Override
public void notify(Event event) {
final String path = event.path();
@@ -331,29 +343,57 @@ public class ServerNodeManager implements InitializingBean {
MASTER_SIZE = nodes.size();
MASTER_SLOT = index;
} else {
- logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress());
- }
- logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT,
+ logger.warn("current addr:{} is not in active master list",
masterConfig.getMasterAddress());
+ }
+ logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE,
+ MASTER_SLOT, masterConfig.getMasterAddress());
} finally {
masterLock.unlock();
}
}
/**
- * sync worker group nodes
+ * sync worker group nodes from registry center
*
* @param workerGroup worker group
* @param nodes worker nodes
+ * @param type event type
+ */
+ private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection<String> nodes,
+ Type type) {
+ try {
+ if (type == Type.REMOVE) {
+ if (!registryWorkerGroupNodes.containsKey(workerGroup)) {
+ logger.warn("cannot remove worker group {}, not in active list", workerGroup);
+ return;
+ }
+ registryWorkerGroupNodes.remove(workerGroup);
+ } else {
+ Set<String> workerNodes = registryWorkerGroupNodes
+ .getOrDefault(workerGroup, new HashSet<>());
+ workerNodes.clear();
+ workerNodes.addAll(nodes);
+ registryWorkerGroupNodes.put(workerGroup, workerNodes);
+ }
+ } finally {
+ refreshWorkerGroupNodes();
+ }
+ }
+
+ /**
+ * refresh worker group nodes
*/
- private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) {
+ private void refreshWorkerGroupNodes() {
workerGroupWriteLock.lock();
try {
- Set<String> workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>());
- workerNodes.clear();
- workerNodes.addAll(nodes);
- workerGroupNodes.put(workerGroup, workerNodes);
+ workerGroupNodes.clear();
+ workerGroupNodes.putAll(registryWorkerGroupNodes);
+ workerGroupNodes.putAll(dbWorkerGroupNodes);
+ logger.debug("refresh worker group nodes, current list: {}",
+ Arrays.toString(workerGroupNodes.keySet().toArray()));
} finally {
+ notifyWorkerInfoChangeListeners();
workerGroupWriteLock.unlock();
}
}
@@ -418,7 +458,8 @@ public class ServerNodeManager implements InitializingBean {
try {
workerNodeInfo.clear();
for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
- workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
+ workerNodeInfo.put(entry.getKey(),
+ JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
}
} finally {
workerNodeInfoWriteLock.unlock();