You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/25 15:22:44 UTC

[dolphinscheduler] 02/02: [fix#12000]Cannot remove the WorkerGroup from the master service (#12050)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 3e899bee0613e5cb4787feb53c3465f6220b25f6
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Sat Sep 24 18:57:20 2022 +0800

    [fix#12000]Cannot remove the WorkerGroup from the master service (#12050)
    
    * [Bug] [Master] Cannot remove the WorkerGroup from the master service. #12000
    
    * remove unnecessary locks
    
    * Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
    
    Co-authored-by: caishunfeng <ca...@gmail.com>
    
    Co-authored-by: caishunfeng <ca...@gmail.com>
---
 .../server/master/registry/ServerNodeManager.java  | 167 +++++++++++++--------
 1 file changed, 106 insertions(+), 61 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 147fff1364..ef9111cae7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -17,8 +17,9 @@
 
 package org.apache.dolphinscheduler.server.master.registry;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
@@ -34,14 +35,12 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
 
-import javax.annotation.PreDestroy;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -57,8 +56,13 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+import javax.annotation.PreDestroy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 /**
  * server node manager
@@ -74,16 +78,25 @@ public class ServerNodeManager implements InitializingBean {
     private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = workerGroupLock.readLock();
     private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = workerGroupLock.writeLock();
 
-
     private final ReentrantReadWriteLock workerNodeInfoLock = new ReentrantReadWriteLock();
     private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = workerNodeInfoLock.readLock();
     private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock();
 
     /**
-     * worker group nodes, workerGroup -> ips
+     * worker group nodes, workerGroup -> ips, combining registryWorkerGroupNodes and dbWorkerGroupNodes
      */
     private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
 
+    /**
+     * worker group nodes from registry center, workerGroup -> ips
+     */
+    private final ConcurrentHashMap<String, Set<String>> registryWorkerGroupNodes = new ConcurrentHashMap<>();
+
+    /**
+     * worker group nodes from db, workerGroup -> ips
+     */
+    private final ConcurrentHashMap<String, Set<String>> dbWorkerGroupNodes = new ConcurrentHashMap<>();
+
     /**
      * master nodes
      */
@@ -138,7 +151,6 @@ public class ServerNodeManager implements InitializingBean {
         return MASTER_SIZE;
     }
 
-
     /**
      * init listener
      *
@@ -146,22 +158,20 @@ public class ServerNodeManager implements InitializingBean {
      */
     @Override
     public void afterPropertiesSet() throws Exception {
-        /**
-         * load nodes from zookeeper
-         */
+
+        // load nodes from zookeeper
         load();
-        /**
-         * init executor service
-         */
-        executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
+
+        // init executor service
+        executorService =
+                Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
+
         executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS);
-        /*
-         * init MasterNodeListener listener
-         */
+
+        // init MasterNodeListener listener
         registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
-        /*
-         * init WorkerNodeListener listener
-         */
+
+        // init WorkerNodeListener listener
         registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener());
     }
 
@@ -169,17 +179,14 @@ public class ServerNodeManager implements InitializingBean {
      * load nodes from zookeeper
      */
     public void load() {
-        /*
-         * master nodes from zookeeper
-         */
+        // master nodes from zookeeper
         updateMasterNodes();
 
-        /*
-         * worker group nodes from zookeeper
-         */
+        // worker group nodes from zookeeper
         Collection<String> workerGroups = registryClient.getWorkerGroupDirectly();
         for (String workerGroup : workerGroups) {
-            syncWorkerGroupNodes(workerGroup, registryClient.getWorkerGroupNodesDirectly(workerGroup));
+            syncWorkerGroupNodesFromRegistry(workerGroup,
+                    registryClient.getWorkerGroupNodesDirectly(workerGroup), Type.ADD);
         }
     }
 
@@ -191,29 +198,38 @@ public class ServerNodeManager implements InitializingBean {
         @Override
         public void run() {
             try {
+                dbWorkerGroupNodes.clear();
+
                 // sync worker node info
-                Map<String, String> registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true);
+                Map<String, String> registryWorkerNodeMap = registryClient
+                        .getServerMaps(NodeType.WORKER, true);
                 syncAllWorkerNodeInfo(registryWorkerNodeMap);
                 // sync worker group nodes from database
                 List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
                 if (CollectionUtils.isNotEmpty(workerGroupList)) {
                     for (WorkerGroup wg : workerGroupList) {
                         String workerGroupName = wg.getName();
-                        Set<String> workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg);
+                        Set<String> workerAddress = getWorkerAddressByWorkerGroup(
+                                registryWorkerNodeMap, wg);
                         if (!workerAddress.isEmpty()) {
-                            syncWorkerGroupNodes(workerGroupName, workerAddress);
+                            Set<String> workerNodes = dbWorkerGroupNodes
+                                    .getOrDefault(workerGroupName, new HashSet<>());
+                            workerNodes.clear();
+                            workerNodes.addAll(workerAddress);
+                            dbWorkerGroupNodes.put(workerGroupName, workerNodes);
                         }
                     }
                 }
-                notifyWorkerInfoChangeListeners();
             } catch (Exception e) {
                 logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
+            } finally {
+                refreshWorkerGroupNodes();
             }
         }
     }
 
-
-    protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) {
+    protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo,
+                                                        WorkerGroup wg) {
         Set<String> nodes = new HashSet<>();
         String[] addrs = wg.getAddrList().split(Constants.COMMA);
         for (String addr : addrs) {
@@ -238,29 +254,27 @@ public class ServerNodeManager implements InitializingBean {
                 try {
                     String[] parts = path.split("/");
                     if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
-                        throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
+                        throw new IllegalArgumentException(
+                                String.format("worker group path : %s is not valid, ignore", path));
                     }
                     final String workerGroupName = parts[parts.length - 2];
                     final String workerAddress = parts[parts.length - 1];
 
+                    logger.debug("received subscribe event : {}", event);
+                    Collection<String> currentNodes = registryClient
+                            .getWorkerGroupNodesDirectly(workerGroupName);
+                    syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type);
+
                     if (type == Type.ADD) {
-                        logger.info("worker group node : {} added.", path);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        logger.info("currentNodes : {}", currentNodes);
-                        syncWorkerGroupNodes(workerGroupName, currentNodes);
+                        logger.info("worker group node : {} added, currentNodes : {}", path,
+                                currentNodes);
                     } else if (type == Type.REMOVE) {
                         logger.info("worker group node : {} down.", path);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        syncWorkerGroupNodes(workerGroupName, currentNodes);
                         alertDao.sendServerStoppedAlert(1, path, "WORKER");
                     } else if (type == Type.UPDATE) {
-                        logger.debug("worker group node : {} update, data: {}", path, data);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        syncWorkerGroupNodes(workerGroupName, currentNodes);
-
-                        syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
+                        syncSingleWorkerNodeInfo(workerAddress,
+                                JSONUtils.parseObject(data, WorkerHeartBeat.class));
                     }
-                    notifyWorkerInfoChangeListeners();
                 } catch (IllegalArgumentException ex) {
                     logger.warn(ex.getMessage());
                 } catch (Exception ex) {
@@ -272,6 +286,7 @@ public class ServerNodeManager implements InitializingBean {
     }
 
     class MasterDataListener implements SubscribeListener {
+
         @Override
         public void notify(Event event) {
             final String path = event.path();
@@ -328,28 +343,57 @@ public class ServerNodeManager implements InitializingBean {
                 MASTER_SIZE = nodes.size();
                 MASTER_SLOT = index;
             } else {
-                logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress());
+                logger.warn("current addr:{} is not in active master list",
+                        masterConfig.getMasterAddress());
             }
-            logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, masterConfig.getMasterAddress());
+            logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE,
+                    MASTER_SLOT, masterConfig.getMasterAddress());
         } finally {
             masterLock.unlock();
         }
     }
 
     /**
-     * sync worker group nodes
+     * sync worker group nodes from registry center
      *
      * @param workerGroup worker group
      * @param nodes worker nodes
+     * @param type event type
+     */
+    private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection<String> nodes,
+                                                  Type type) {
+        try {
+            if (type == Type.REMOVE) {
+                if (!registryWorkerGroupNodes.containsKey(workerGroup)) {
+                    logger.warn("cannot remove worker group {}, not in active list", workerGroup);
+                    return;
+                }
+                registryWorkerGroupNodes.remove(workerGroup);
+            } else {
+                Set<String> workerNodes = registryWorkerGroupNodes
+                        .getOrDefault(workerGroup, new HashSet<>());
+                workerNodes.clear();
+                workerNodes.addAll(nodes);
+                registryWorkerGroupNodes.put(workerGroup, workerNodes);
+            }
+        } finally {
+            refreshWorkerGroupNodes();
+        }
+    }
+
+    /**
+     * refresh worker group nodes
      */
-    private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) {
+    private void refreshWorkerGroupNodes() {
         workerGroupWriteLock.lock();
         try {
-            Set<String> workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>());
-            workerNodes.clear();
-            workerNodes.addAll(nodes);
-            workerGroupNodes.put(workerGroup, workerNodes);
+            workerGroupNodes.clear();
+            workerGroupNodes.putAll(registryWorkerGroupNodes);
+            workerGroupNodes.putAll(dbWorkerGroupNodes);
+            logger.debug("refresh worker group nodes, current list: {}",
+                    Arrays.toString(workerGroupNodes.keySet().toArray()));
         } finally {
+            notifyWorkerInfoChangeListeners();
             workerGroupWriteLock.unlock();
         }
     }
@@ -414,7 +458,8 @@ public class ServerNodeManager implements InitializingBean {
         try {
             workerNodeInfo.clear();
             for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
-                workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
+                workerNodeInfo.put(entry.getKey(),
+                        JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
             }
         } finally {
             workerNodeInfoWriteLock.unlock();