You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/24 10:57:36 UTC

[dolphinscheduler] branch dev updated: [fix#12000]Cannot remove the WorkerGroup from the master service (#12050)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ada7cf71d5 [fix#12000]Cannot remove the WorkerGroup from the master service (#12050)
ada7cf71d5 is described below

commit ada7cf71d5c70174c7d7c6608f4f60896db5d6b9
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Sat Sep 24 18:57:20 2022 +0800

    [fix#12000]Cannot remove the WorkerGroup from the master service (#12050)
    
    * [Bug] [Master] Cannot remove the WorkerGroup from the master service. #12000
    
    * remove unnecessary locks
    
    * Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
    
    Co-authored-by: caishunfeng <ca...@gmail.com>
    
    Co-authored-by: caishunfeng <ca...@gmail.com>
---
 .../server/master/registry/ServerNodeManager.java  | 141 +++++++++++++--------
 1 file changed, 91 insertions(+), 50 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 3ec10b3ed7..19c42c0711 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.registry;
 import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
 import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
 
+import java.util.Arrays;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
@@ -82,10 +83,20 @@ public class ServerNodeManager implements InitializingBean {
     private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock();
 
     /**
-     * worker group nodes, workerGroup -> ips
+     * worker group nodes, workerGroup -> ips, combining registryWorkerGroupNodes and dbWorkerGroupNodes
      */
     private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
 
+    /**
+     * worker group nodes from registry center, workerGroup -> ips
+     */
+    private final ConcurrentHashMap<String, Set<String>> registryWorkerGroupNodes = new ConcurrentHashMap<>();
+
+    /**
+     * worker group nodes from db, workerGroup -> ips
+     */
+    private final ConcurrentHashMap<String, Set<String>> dbWorkerGroupNodes = new ConcurrentHashMap<>();
+
     /**
      * master nodes
      */
@@ -140,6 +151,7 @@ public class ServerNodeManager implements InitializingBean {
         return MASTER_SIZE;
     }
 
+
     /**
      * init listener
      *
@@ -147,23 +159,19 @@ public class ServerNodeManager implements InitializingBean {
      */
     @Override
     public void afterPropertiesSet() throws Exception {
-        /**
-         * load nodes from zookeeper
-         */
+
+        // load nodes from zookeeper
         load();
-        /**
-         * init executor service
-         */
+
+        // init executor service
         executorService =
-                Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
+            Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
         executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS);
-        /*
-         * init MasterNodeListener listener
-         */
+
+        // init MasterNodeListener listener
         registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
-        /*
-         * init WorkerNodeListener listener
-         */
+
+        // init WorkerNodeListener listener
         registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener());
     }
 
@@ -171,17 +179,14 @@ public class ServerNodeManager implements InitializingBean {
      * load nodes from zookeeper
      */
     public void load() {
-        /*
-         * master nodes from zookeeper
-         */
+        // master nodes from zookeeper
         updateMasterNodes();
 
-        /*
-         * worker group nodes from zookeeper
-         */
+        // worker group nodes from zookeeper
         Collection<String> workerGroups = registryClient.getWorkerGroupDirectly();
         for (String workerGroup : workerGroups) {
-            syncWorkerGroupNodes(workerGroup, registryClient.getWorkerGroupNodesDirectly(workerGroup));
+            syncWorkerGroupNodesFromRegistry(workerGroup,
+                registryClient.getWorkerGroupNodesDirectly(workerGroup), Type.ADD);
         }
     }
 
@@ -193,28 +198,39 @@ public class ServerNodeManager implements InitializingBean {
         @Override
         public void run() {
             try {
+                dbWorkerGroupNodes.clear();
+
                 // sync worker node info
-                Map<String, String> registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true);
+                Map<String, String> registryWorkerNodeMap = registryClient
+                    .getServerMaps(NodeType.WORKER, true);
                 syncAllWorkerNodeInfo(registryWorkerNodeMap);
                 // sync worker group nodes from database
                 List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
                 if (CollectionUtils.isNotEmpty(workerGroupList)) {
                     for (WorkerGroup wg : workerGroupList) {
                         String workerGroupName = wg.getName();
-                        Set<String> workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg);
+                        Set<String> workerAddress = getWorkerAddressByWorkerGroup(
+                            registryWorkerNodeMap, wg);
                         if (!workerAddress.isEmpty()) {
-                            syncWorkerGroupNodes(workerGroupName, workerAddress);
+                            Set<String> workerNodes = dbWorkerGroupNodes
+                                .getOrDefault(workerGroupName, new HashSet<>());
+                            workerNodes.clear();
+                            workerNodes.addAll(workerAddress);
+                            dbWorkerGroupNodes.put(workerGroupName, workerNodes);
                         }
                     }
                 }
-                notifyWorkerInfoChangeListeners();
             } catch (Exception e) {
                 logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
+            } finally {
+                refreshWorkerGroupNodes();
             }
         }
     }
 
-    protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) {
+
+    protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo,
+        WorkerGroup wg) {
         Set<String> nodes = new HashSet<>();
         String[] addrs = wg.getAddrList().split(Constants.COMMA);
         for (String addr : addrs) {
@@ -240,29 +256,26 @@ public class ServerNodeManager implements InitializingBean {
                     String[] parts = path.split("/");
                     if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
                         throw new IllegalArgumentException(
-                                String.format("worker group path : %s is not valid, ignore", path));
+                            String.format("worker group path : %s is not valid, ignore", path));
                     }
                     final String workerGroupName = parts[parts.length - 2];
                     final String workerAddress = parts[parts.length - 1];
 
+                    logger.debug("received subscribe event : {}", event);
+                    Collection<String> currentNodes = registryClient
+                        .getWorkerGroupNodesDirectly(workerGroupName);
+                    syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type);
+
                     if (type == Type.ADD) {
-                        logger.info("worker group node : {} added.", path);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        logger.info("currentNodes : {}", currentNodes);
-                        syncWorkerGroupNodes(workerGroupName, currentNodes);
+                        logger.info("worker group node : {} added, currentNodes : {}", path,
+                            currentNodes);
                     } else if (type == Type.REMOVE) {
                         logger.info("worker group node : {} down.", path);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        syncWorkerGroupNodes(workerGroupName, currentNodes);
                         alertDao.sendServerStoppedAlert(1, path, "WORKER");
                     } else if (type == Type.UPDATE) {
-                        logger.debug("worker group node : {} update, data: {}", path, data);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        syncWorkerGroupNodes(workerGroupName, currentNodes);
-
-                        syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
+                        syncSingleWorkerNodeInfo(workerAddress,
+                            JSONUtils.parseObject(data, WorkerHeartBeat.class));
                     }
-                    notifyWorkerInfoChangeListeners();
                 } catch (IllegalArgumentException ex) {
                     logger.warn(ex.getMessage());
                 } catch (Exception ex) {
@@ -274,7 +287,6 @@ public class ServerNodeManager implements InitializingBean {
     }
 
     class MasterDataListener implements SubscribeListener {
-
         @Override
         public void notify(Event event) {
             final String path = event.path();
@@ -331,29 +343,57 @@ public class ServerNodeManager implements InitializingBean {
                 MASTER_SIZE = nodes.size();
                 MASTER_SLOT = index;
             } else {
-                logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress());
-            }
-            logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT,
+                logger.warn("current addr:{} is not in active master list",
                     masterConfig.getMasterAddress());
+            }
+            logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE,
+                MASTER_SLOT, masterConfig.getMasterAddress());
         } finally {
             masterLock.unlock();
         }
     }
 
     /**
-     * sync worker group nodes
+     * sync worker group nodes from registry center
      *
      * @param workerGroup worker group
      * @param nodes worker nodes
+     * @param type event type
+     */
+    private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection<String> nodes,
+        Type type) {
+        try {
+            if (type == Type.REMOVE) {
+                if (!registryWorkerGroupNodes.containsKey(workerGroup)) {
+                    logger.warn("cannot remove worker group {}, not in active list", workerGroup);
+                    return;
+                }
+                registryWorkerGroupNodes.remove(workerGroup);
+            } else {
+                Set<String> workerNodes = registryWorkerGroupNodes
+                    .getOrDefault(workerGroup, new HashSet<>());
+                workerNodes.clear();
+                workerNodes.addAll(nodes);
+                registryWorkerGroupNodes.put(workerGroup, workerNodes);
+            }
+        } finally {
+            refreshWorkerGroupNodes();
+        }
+    }
+
+    /**
+     * refresh worker group nodes
      */
-    private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) {
+    private void refreshWorkerGroupNodes() {
         workerGroupWriteLock.lock();
         try {
-            Set<String> workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>());
-            workerNodes.clear();
-            workerNodes.addAll(nodes);
-            workerGroupNodes.put(workerGroup, workerNodes);
+            workerGroupNodes.clear();
+            workerGroupNodes.putAll(registryWorkerGroupNodes);
+            workerGroupNodes.putAll(dbWorkerGroupNodes);
+            logger.debug("refresh worker group nodes, current list: {}",
+                Arrays.toString(workerGroupNodes.keySet().toArray()));
         } finally {
+            notifyWorkerInfoChangeListeners();
             workerGroupWriteLock.unlock();
         }
     }
@@ -418,7 +458,8 @@ public class ServerNodeManager implements InitializingBean {
         try {
             workerNodeInfo.clear();
             for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
-                workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
+                workerNodeInfo.put(entry.getKey(),
+                    JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
             }
         } finally {
             workerNodeInfoWriteLock.unlock();