You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/25 15:22:42 UTC

[dolphinscheduler] branch 3.1.0-prepare updated (b0b29ed8e1 -> 3e899bee06)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a change to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


    from b0b29ed8e1 [fix][python] Task switch branch not show in webui (#12120)
     new 42d8308940 [Doc][Improvement] Add instructions for process execution type and data complement (#12121)
     new 3e899bee06 [fix#12000]Cannot remove the WorkerGroup from the master service (#12050)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/docs/en/guide/project/workflow-definition.md  |  56 ++++++-
 docs/docs/zh/guide/project/workflow-definition.md  |  14 +-
 .../new_ui/dev/project/workflow-execution-type.png | Bin 0 -> 751996 bytes
 .../server/master/registry/ServerNodeManager.java  | 167 +++++++++++++--------
 4 files changed, 167 insertions(+), 70 deletions(-)
 create mode 100644 docs/img/new_ui/dev/project/workflow-execution-type.png


[dolphinscheduler] 01/02: [Doc][Improvement] Add instructions for process execution type and data complement (#12121)

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 42d83089409ade7a11945825ff37dfac187bb06b
Author: Eric Gao <er...@gmail.com>
AuthorDate: Sat Sep 24 09:59:02 2022 +0800

    [Doc][Improvement] Add instructions for process execution type and data complement (#12121)
    
    * Add instructions for process execution type and data complement
    
    * Fix and polish the docs
---
 docs/docs/en/guide/project/workflow-definition.md  |  56 +++++++++++++++++++--
 docs/docs/zh/guide/project/workflow-definition.md  |  14 ++++--
 .../new_ui/dev/project/workflow-execution-type.png | Bin 0 -> 751996 bytes
 3 files changed, 61 insertions(+), 9 deletions(-)

diff --git a/docs/docs/en/guide/project/workflow-definition.md b/docs/docs/en/guide/project/workflow-definition.md
index a19dc8d756..a08cf7a81f 100644
--- a/docs/docs/en/guide/project/workflow-definition.md
+++ b/docs/docs/en/guide/project/workflow-definition.md
@@ -50,6 +50,17 @@ Click the `Save` button, and the "Set DAG chart name" window pops up, as shown i
 
 ![workflow-save](../../../../img/new_ui/dev/project/workflow-save.png)
 
+### Configure workflow (process) execution type
+
+Click the `Save` button and configure `process execution type` in the pop-up window. There are four process execution types:
+
+- `Parallel`: If there are multiple instances of the same workflow definition, execute the instances in parallel.
+- `Serial Wait`: If there are multiple instances of the same workflow definition, execute the instances in serial.
+- `Serial Discard`: If there are multiple instances of the same workflow definition, discard the later ones and kill the current running ones.
+- `Serial Priority`: If there are multiple instances of the same workflow definition, execute the instances according to the priority in serial.
+
+![workflow-execution-type](../../../../img/new_ui/dev/project/workflow-execution-type.png)
+
 ## Workflow Definition Operation Function
 
 Click `Project Management -> Workflow -> Workflow Definition` to enter the workflow definition page, as shown below:
@@ -60,14 +71,49 @@ Workflow running parameter description:
 
 * **Failure strategy**: When a task node fails to execute, other parallel task nodes need to execute the strategy. "Continue" means: After a task fails, other task nodes execute normally; "End" means: Terminate all tasks being executed, and terminate the entire process.
 * **Notification strategy**: When the process ends, send process execution information notification emails according to the process status, including no status, success, failure, success or failure.
-* **Process priority**: the priority of process operation, divided into five levels: the highest (HIGHEST), high (HIGH), medium (MEDIUM), low (LOW), the lowest (LOWEST). When the number of master threads is insufficient, processes with higher levels will be executed first in the execution queue, and processes with the same priority will be executed in the order of first-in, first-out.
+* **Process priority**: The priority of process execution, there are five different priorities: the highest (HIGHEST), high (HIGH), medium (MEDIUM), low (LOW), the lowest (LOWEST). When the number of master threads is insufficient, processes with higher priorities in the execution queue will run first. Processes with the same priority will run in first-come-first-served fashion.
 * **Worker grouping**: This process can only be executed in the specified worker machine group. The default is Default, which can be executed on any worker.
 * **Notification Group**: Select Notification Policy||Timeout Alarm||When fault tolerance occurs, process information or emails will be sent to all members in the notification group.
-* **Recipient**: Select Notification Policy||Timeout Alarm||When fault tolerance occurs, process information or alarm email will be sent to the recipient list.
-* **Cc**: Select Notification Policy||Timeout Alarm||When fault tolerance occurs, the process information or alarm email will be copied to the Cc list.
 * **Startup parameters**: Set or override the value of global parameters when starting a new process instance.
-* **Complement**: There are 2 modes of serial complement and parallel complement. Serial complement: within the specified time range, perform complements in sequence from the start date to the end date, and generate N process instances in turn; parallel complement: within the specified time range, perform multiple complements at the same time, and generate N process instances at the same time .
-  * **Complement**: Execute the workflow definition of the specified date, you can select the time range of the supplement (currently only supports the supplement for consecutive days), for example, the data from May 1st to May 10th needs to be supplemented, as shown in the following figure:
+* **Complement(Backfill)**: Run workflow for a specified historical period. There are two strategies: serial complement and parallel complement. You could select the time period or fill in it manually in UI.
+  * Serial complement: Run the workflow from start date to end date according to the time period you set in serial.
+
+  ![workflow-serial](../../../../img/new_ui/dev/project/workflow-serial.png)
+
+  * Parallel complement: Run the workflow from start date to end date according to the time period you set in parallel.
+
+  ![workflow-parallel](../../../../img/new_ui/dev/project/workflow-parallel.png)
+
+  * Parallelism: The max number of workflow instances of the workflow definition you choose for complement.
+    ![workflow-concurrency-from](../../../../img/new_ui/dev/project/workflow-concurrency-from.png)
+
+  ![workflow-concurrency](../../../../img/new_ui/dev/project/workflow-concurrency.png)
+
+  * Mode of dependent: Whether to trigger downstream workflow definition for complement.
+
+  ![workflow-dependency](../../../../img/new_ui/dev/project/workflow-dependency.png)
+
+  * Schedule date:
+
+    1. Select from pop-up window:
+
+    ![workflow-pageSelection](../../../../img/new_ui/dev/project/workflow-pageSelection.png)
+
+    2. Fill in the time period manually:
+
+    ![workflow-input](../../../../img/new_ui/dev/project/workflow-input.png)
+
+  * Complement with or without scheduling:
+
+    1. Without scheduling: Run workflow every day from start date to end date according to the time period you set. e.g. Do complement from July 7th to 10th without scheduling:
+
+    ![workflow-unconfiguredTimingResult](../../../../img/new_ui/dev/project/workflow-unconfiguredTimingResult.png)
+
+    2. With scheduling: Run workflow from start date to end date on schedule according to the time period and schedule you set. e.g. Do complement from July 7th to 10th with the schedule of 5 AM every day:
+
+    ![workflow-configuredTiming](../../../../img/new_ui/dev/project/workflow-configuredTiming.png)
+
+    ![workflow-configuredTimingResult](../../../../img/new_ui/dev/project/workflow-configuredTimingResult.png)
 
 The following are the operation functions of the workflow definition list:
 
diff --git a/docs/docs/zh/guide/project/workflow-definition.md b/docs/docs/zh/guide/project/workflow-definition.md
index 521ec8757f..630e4c5c7b 100644
--- a/docs/docs/zh/guide/project/workflow-definition.md
+++ b/docs/docs/zh/guide/project/workflow-definition.md
@@ -37,6 +37,14 @@
 
   > 其他类型任务,请参考 [任务节点类型和参数设置](#TaskParamers)。 <!-- markdown-link-check-disable-line -->
 
+- **执行策略**
+- `并行`:如果对于同一个工作流定义,同时有多个工作流实例,则并行执行工作流实例。
+- `串行等待`:如果对于同一个工作流定义,同时有多个工作流实例,则并行执行工作流实例。
+- `串行抛弃`:如果对于同一个工作流定义,同时有多个工作流实例,则抛弃后生成的工作流实例并杀掉正在跑的实例。
+- `串行优先`:如果对于同一个工作流定义,同时有多个工作流实例,则按照优先级串行执行工作流实例。
+
+![workflow-execution-type](../../../../img/new_ui/dev/project/workflow-execution-type.png)
+
 ## 工作流定义操作功能
 
 点击项目管理->工作流->工作流定义,进入工作流定义页面,如下图所示:
@@ -74,8 +82,6 @@
 * 流程优先级:流程运行的优先级,分五个等级:最高(HIGHEST),高(HIGH),中(MEDIUM),低(LOW),最低(LOWEST)。当 master 线程数不足时,级别高的流程在执行队列中会优先执行,相同优先级的流程按照先进先出的顺序执行。
 * Worker 分组:该流程只能在指定的 worker 机器组里执行。默认是 Default,可以在任一 worker 上执行。
 * 通知组:选择通知策略||超时报警||发生容错时,会发送流程信息或邮件到通知组里的所有成员。
-* 收件人:选择通知策略||超时报警||发生容错时,会发送流程信息或告警邮件到收件人列表。
-* 抄送人:选择通知策略||超时报警||发生容错时,会抄送流程信息或告警邮件到抄送人列表。
 * 启动参数: 在启动新的流程实例时,设置或覆盖全局参数的值。
 * 补数:指运行指定日期范围内的工作流定义,根据补数策略生成对应的工作流实例,补数策略包括串行补数、并行补数 2 种模式,日期可以通过页面选择或者手动输入。
   * 串行补数:指定时间范围内,从开始日期至结束日期依次执行补数,依次生成多条流程实例;点击运行工作流,选择串行补数模式:例如从7月 9号到7月10号依次执行,依次在流程实例页面生成两条流程实例。
@@ -107,11 +113,11 @@
 
   * 补数与定时配置的关系:
 
-    1. 未配置定时:当没有定时配置时默认会根据所选时间范围进行每天一次的补数,比如该工作流调度日期为7月 7号到7月10号,未配置定时,流程实例为:
+    1. 未配置定时:当没有定时配置时默认会根据所选时间范围进行每天一次的补数,比如该工作流调度日期为7月7号到7月10号,未配置定时,流程实例为:
 
     ![workflow-unconfiguredTimingResult](../../../../img/new_ui/dev/project/workflow-unconfiguredTimingResult.png)
 
-    2. 已配置定时:如果有定时配置则会根据所选的时间范围结合定时配置进行补数,比如该工作流调度日期为7月 7号到7月10号,配置了定时(每日凌晨5点运行),流程实例为:
+    2. 已配置定时:如果有定时配置则会根据所选的时间范围结合定时配置进行补数,比如该工作流调度日期为7月7号到7月10号,配置了定时(每日凌晨5点运行),流程实例为:
 
     ![workflow-configuredTiming](../../../../img/new_ui/dev/project/workflow-configuredTiming.png)
 
diff --git a/docs/img/new_ui/dev/project/workflow-execution-type.png b/docs/img/new_ui/dev/project/workflow-execution-type.png
new file mode 100644
index 0000000000..2e7e731aea
Binary files /dev/null and b/docs/img/new_ui/dev/project/workflow-execution-type.png differ


[dolphinscheduler] 02/02: [fix#12000]Cannot remove the WorkerGroup from the master service (#12050)

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 3e899bee0613e5cb4787feb53c3465f6220b25f6
Author: Yann Ann <xi...@gmail.com>
AuthorDate: Sat Sep 24 18:57:20 2022 +0800

    [fix#12000]Cannot remove the WorkerGroup from the master service (#12050)
    
    * [Bug] [Master] Cannot remove the WorkerGroup from the master service. #12000
    
    * remove unnecessary locks
    
    * Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
    
    Co-authored-by: caishunfeng <ca...@gmail.com>
    
    Co-authored-by: caishunfeng <ca...@gmail.com>
---
 .../server/master/registry/ServerNodeManager.java  | 167 +++++++++++++--------
 1 file changed, 106 insertions(+), 61 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 147fff1364..ef9111cae7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -17,8 +17,9 @@
 
 package org.apache.dolphinscheduler.server.master.registry;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
@@ -34,14 +35,12 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
 
-import javax.annotation.PreDestroy;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -57,8 +56,13 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+import javax.annotation.PreDestroy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 /**
  * server node manager
@@ -74,16 +78,25 @@ public class ServerNodeManager implements InitializingBean {
     private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = workerGroupLock.readLock();
     private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = workerGroupLock.writeLock();
 
-
     private final ReentrantReadWriteLock workerNodeInfoLock = new ReentrantReadWriteLock();
     private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = workerNodeInfoLock.readLock();
     private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock();
 
     /**
-     * worker group nodes, workerGroup -> ips
+     * worker group nodes, workerGroup -> ips, combining registryWorkerGroupNodes and dbWorkerGroupNodes
      */
     private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
 
+    /**
+     * worker group nodes from registry center, workerGroup -> ips
+     */
+    private final ConcurrentHashMap<String, Set<String>> registryWorkerGroupNodes = new ConcurrentHashMap<>();
+
+    /**
+     * worker group nodes from db, workerGroup -> ips
+     */
+    private final ConcurrentHashMap<String, Set<String>> dbWorkerGroupNodes = new ConcurrentHashMap<>();
+
     /**
      * master nodes
      */
@@ -138,7 +151,6 @@ public class ServerNodeManager implements InitializingBean {
         return MASTER_SIZE;
     }
 
-
     /**
      * init listener
      *
@@ -146,22 +158,20 @@ public class ServerNodeManager implements InitializingBean {
      */
     @Override
     public void afterPropertiesSet() throws Exception {
-        /**
-         * load nodes from zookeeper
-         */
+
+        // load nodes from zookeeper
         load();
-        /**
-         * init executor service
-         */
-        executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
+
+        // init executor service
+        executorService =
+                Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
+
         executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS);
-        /*
-         * init MasterNodeListener listener
-         */
+
+        // init MasterNodeListener listener
         registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
-        /*
-         * init WorkerNodeListener listener
-         */
+
+        // init WorkerNodeListener listener
         registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener());
     }
 
@@ -169,17 +179,14 @@ public class ServerNodeManager implements InitializingBean {
      * load nodes from zookeeper
      */
     public void load() {
-        /*
-         * master nodes from zookeeper
-         */
+        // master nodes from zookeeper
         updateMasterNodes();
 
-        /*
-         * worker group nodes from zookeeper
-         */
+        // worker group nodes from zookeeper
         Collection<String> workerGroups = registryClient.getWorkerGroupDirectly();
         for (String workerGroup : workerGroups) {
-            syncWorkerGroupNodes(workerGroup, registryClient.getWorkerGroupNodesDirectly(workerGroup));
+            syncWorkerGroupNodesFromRegistry(workerGroup,
+                    registryClient.getWorkerGroupNodesDirectly(workerGroup), Type.ADD);
         }
     }
 
@@ -191,29 +198,38 @@ public class ServerNodeManager implements InitializingBean {
         @Override
         public void run() {
             try {
+                dbWorkerGroupNodes.clear();
+
                 // sync worker node info
-                Map<String, String> registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true);
+                Map<String, String> registryWorkerNodeMap = registryClient
+                        .getServerMaps(NodeType.WORKER, true);
                 syncAllWorkerNodeInfo(registryWorkerNodeMap);
                 // sync worker group nodes from database
                 List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
                 if (CollectionUtils.isNotEmpty(workerGroupList)) {
                     for (WorkerGroup wg : workerGroupList) {
                         String workerGroupName = wg.getName();
-                        Set<String> workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg);
+                        Set<String> workerAddress = getWorkerAddressByWorkerGroup(
+                                registryWorkerNodeMap, wg);
                         if (!workerAddress.isEmpty()) {
-                            syncWorkerGroupNodes(workerGroupName, workerAddress);
+                            Set<String> workerNodes = dbWorkerGroupNodes
+                                    .getOrDefault(workerGroupName, new HashSet<>());
+                            workerNodes.clear();
+                            workerNodes.addAll(workerAddress);
+                            dbWorkerGroupNodes.put(workerGroupName, workerNodes);
                         }
                     }
                 }
-                notifyWorkerInfoChangeListeners();
             } catch (Exception e) {
                 logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
+            } finally {
+                refreshWorkerGroupNodes();
             }
         }
     }
 
-
-    protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) {
+    protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo,
+                                                        WorkerGroup wg) {
         Set<String> nodes = new HashSet<>();
         String[] addrs = wg.getAddrList().split(Constants.COMMA);
         for (String addr : addrs) {
@@ -238,29 +254,27 @@ public class ServerNodeManager implements InitializingBean {
                 try {
                     String[] parts = path.split("/");
                     if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
-                        throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
+                        throw new IllegalArgumentException(
+                                String.format("worker group path : %s is not valid, ignore", path));
                     }
                     final String workerGroupName = parts[parts.length - 2];
                     final String workerAddress = parts[parts.length - 1];
 
+                    logger.debug("received subscribe event : {}", event);
+                    Collection<String> currentNodes = registryClient
+                            .getWorkerGroupNodesDirectly(workerGroupName);
+                    syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type);
+
                     if (type == Type.ADD) {
-                        logger.info("worker group node : {} added.", path);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        logger.info("currentNodes : {}", currentNodes);
-                        syncWorkerGroupNodes(workerGroupName, currentNodes);
+                        logger.info("worker group node : {} added, currentNodes : {}", path,
+                                currentNodes);
                     } else if (type == Type.REMOVE) {
                         logger.info("worker group node : {} down.", path);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        syncWorkerGroupNodes(workerGroupName, currentNodes);
                         alertDao.sendServerStoppedAlert(1, path, "WORKER");
                     } else if (type == Type.UPDATE) {
-                        logger.debug("worker group node : {} update, data: {}", path, data);
-                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
-                        syncWorkerGroupNodes(workerGroupName, currentNodes);
-
-                        syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
+                        syncSingleWorkerNodeInfo(workerAddress,
+                                JSONUtils.parseObject(data, WorkerHeartBeat.class));
                     }
-                    notifyWorkerInfoChangeListeners();
                 } catch (IllegalArgumentException ex) {
                     logger.warn(ex.getMessage());
                 } catch (Exception ex) {
@@ -272,6 +286,7 @@ public class ServerNodeManager implements InitializingBean {
     }
 
     class MasterDataListener implements SubscribeListener {
+
         @Override
         public void notify(Event event) {
             final String path = event.path();
@@ -328,28 +343,57 @@ public class ServerNodeManager implements InitializingBean {
                 MASTER_SIZE = nodes.size();
                 MASTER_SLOT = index;
             } else {
-                logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress());
+                logger.warn("current addr:{} is not in active master list",
+                        masterConfig.getMasterAddress());
             }
-            logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, masterConfig.getMasterAddress());
+            logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE,
+                    MASTER_SLOT, masterConfig.getMasterAddress());
         } finally {
             masterLock.unlock();
         }
     }
 
     /**
-     * sync worker group nodes
+     * sync worker group nodes from registry center
      *
      * @param workerGroup worker group
      * @param nodes worker nodes
+     * @param type event type
+     */
+    private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection<String> nodes,
+                                                  Type type) {
+        try {
+            if (type == Type.REMOVE) {
+                if (!registryWorkerGroupNodes.containsKey(workerGroup)) {
+                    logger.warn("cannot remove worker group {}, not in active list", workerGroup);
+                    return;
+                }
+                registryWorkerGroupNodes.remove(workerGroup);
+            } else {
+                Set<String> workerNodes = registryWorkerGroupNodes
+                        .getOrDefault(workerGroup, new HashSet<>());
+                workerNodes.clear();
+                workerNodes.addAll(nodes);
+                registryWorkerGroupNodes.put(workerGroup, workerNodes);
+            }
+        } finally {
+            refreshWorkerGroupNodes();
+        }
+    }
+
+    /**
+     * refresh worker group nodes
      */
-    private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) {
+    private void refreshWorkerGroupNodes() {
         workerGroupWriteLock.lock();
         try {
-            Set<String> workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>());
-            workerNodes.clear();
-            workerNodes.addAll(nodes);
-            workerGroupNodes.put(workerGroup, workerNodes);
+            workerGroupNodes.clear();
+            workerGroupNodes.putAll(registryWorkerGroupNodes);
+            workerGroupNodes.putAll(dbWorkerGroupNodes);
+            logger.debug("refresh worker group nodes, current list: {}",
+                    Arrays.toString(workerGroupNodes.keySet().toArray()));
         } finally {
+            notifyWorkerInfoChangeListeners();
             workerGroupWriteLock.unlock();
         }
     }
@@ -414,7 +458,8 @@ public class ServerNodeManager implements InitializingBean {
         try {
             workerNodeInfo.clear();
             for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
-                workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
+                workerNodeInfo.put(entry.getKey(),
+                        JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
             }
         } finally {
             workerNodeInfoWriteLock.unlock();