You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ke...@apache.org on 2022/10/27 05:21:57 UTC

[dolphinscheduler] branch 3.1.1-prepare updated: cherry-pick [Improvement][WorkerGroup]Remove workerGroup in registry #12217

This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.1-prepare by this push:
     new a17d0cc5d2 cherry-pick [Improvement][WorkerGroup]Remove workerGroup in registry #12217
a17d0cc5d2 is described below

commit a17d0cc5d284feabe11c842f80e62d39d47ec710
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Sep 30 13:58:53 2022 +0800

    cherry-pick [Improvement][WorkerGroup]Remove workerGroup in registry #12217
---
 docs/docs/en/architecture/configuration.md         |   1 -
 docs/docs/zh/architecture/configuration.md         |   1 -
 .../api/service/impl/WorkerGroupServiceImpl.java   |  93 +++-------
 .../api/controller/WorkerGroupControllerTest.java  |   2 +-
 .../server/master/registry/ServerNodeManager.java  | 187 +++++++--------------
 .../service/registry/RegistryClient.java           |  75 ++++-----
 .../server/worker/config/WorkerConfig.java         |  34 ++--
 .../worker/registry/WorkerRegistryClient.java      |  30 ++--
 .../server/worker/task/WorkerHeartBeatTask.java    |  24 +--
 .../src/main/resources/application.yaml            |   7 -
 .../worker/registry/WorkerRegistryClientTest.java  |  31 ++--
 11 files changed, 162 insertions(+), 323 deletions(-)

diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md
index 279411ef75..6a0438aebb 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -291,7 +291,6 @@ Location: `worker-server/conf/application.yaml`
 |worker.tenant-auto-create|true|tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.|
 |worker.max-cpu-load-avg|-1|worker max CPU load avg, only higher than the system CPU load average, worker server can be dispatched tasks. default value -1: the number of CPU cores * 2|
 |worker.reserved-memory|0.3|worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G|
-|worker.groups|default|worker groups separated by comma, e.g., 'worker.groups=default,test' <br> worker will join corresponding group according to this config when startup|
 |worker.alert-listen-host|localhost|the alert listen host of worker|
 |worker.alert-listen-port|50052|the alert listen port of worker|
 |worker.registry-disconnect-strategy.strategy|stop|Used when the worker disconnect from registry, default value: stop. Optional values include stop, waiting|
diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md
index be822c89a2..184053b68a 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -285,7 +285,6 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn相关的配置
 |worker.tenant-auto-create|true|租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。|
 |worker.max-cpu-load-avg|-1|worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为-1: cpu cores * 2|
 |worker.reserved-memory|0.3|worker预留内存,只有低于系统可用内存时,worker服务才能被派发任务,单位为G|
-|worker.groups|default|worker分组配置,逗号分隔,例如'worker.groups=default,test' <br> worker启动时会根据该配置自动加入对应的分组|
 |worker.alert-listen-host|localhost|alert监听host|
 |worker.alert-listen-port|50052|alert监听端口|
 |worker.registry-disconnect-strategy.strategy|stop|当Worker与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting|
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 4e27998019..497df26ca6 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -28,8 +28,6 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.enums.UserType;
-import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
@@ -41,13 +39,13 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -174,10 +172,10 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
      * @return boolean
      */
     private String checkWorkerGroupAddrList(WorkerGroup workerGroup) {
-        Map<String, String> serverMaps = registryClient.getServerMaps(NodeType.WORKER, true);
         if (Strings.isNullOrEmpty(workerGroup.getAddrList())) {
             return null;
         }
+        Map<String, String> serverMaps = registryClient.getServerMaps(NodeType.WORKER);
         for (String addr : workerGroup.getAddrList().split(Constants.COMMA)) {
             if (!serverMaps.containsKey(addr)) {
                 return addr;
@@ -205,11 +203,11 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
         Result result = new Result();
         List<WorkerGroup> workerGroups;
         if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
-            workerGroups = getWorkerGroups(true, null);
+            workerGroups = getWorkerGroups(null);
         } else {
             Set<Integer> ids = resourcePermissionCheckService
                     .userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP, loginUser.getId(), logger);
-            workerGroups = getWorkerGroups(true, ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids));
+            workerGroups = getWorkerGroups(ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids));
         }
         List<WorkerGroup> resultDataList = new ArrayList<>();
         int total = 0;
@@ -255,20 +253,15 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
         Map<String, Object> result = new HashMap<>();
         List<WorkerGroup> workerGroups;
         if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
-            workerGroups = getWorkerGroups(false, null);
+            workerGroups = getWorkerGroups(null);
         } else {
             Set<Integer> ids = resourcePermissionCheckService
                     .userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP, loginUser.getId(), logger);
-            workerGroups = getWorkerGroups(false, ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids));
+            workerGroups = getWorkerGroups(ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids));
         }
         List<String> availableWorkerGroupList = workerGroups.stream()
                 .map(WorkerGroup::getName)
                 .collect(Collectors.toList());
-        int index = availableWorkerGroupList.indexOf(Constants.DEFAULT_WORKER_GROUP);
-        if (index > -1) {
-            availableWorkerGroupList.remove(index);
-            availableWorkerGroupList.add(0, Constants.DEFAULT_WORKER_GROUP);
-        }
         result.put(Constants.DATA_LIST, availableWorkerGroupList);
         putMsg(result, Status.SUCCESS);
         return result;
@@ -277,10 +270,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
     /**
      * get worker groups
      *
-     * @param isPaging whether paging
      * @return WorkerGroup list
      */
-    private List<WorkerGroup> getWorkerGroups(boolean isPaging, List<Integer> ids) {
+    private List<WorkerGroup> getWorkerGroups(List<Integer> ids) {
         // worker groups from database
         List<WorkerGroup> workerGroups;
         if (ids != null) {
@@ -289,63 +281,23 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
             workerGroups = workerGroupMapper.queryAllWorkerGroup();
         }
 
-        // worker groups from zookeeper
-        String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-        Collection<String> workerGroupList = null;
-        try {
-            workerGroupList = registryClient.getChildrenKeys(workerPath);
-        } catch (Exception e) {
-            logger.error("getWorkerGroups exception, workerPath: {}, isPaging: {}", workerPath, isPaging, e);
+        Optional<Boolean> containDefaultWorkerGroups = workerGroups.stream()
+                .map(workerGroup -> Constants.DEFAULT_WORKER_GROUP.equals(workerGroup.getName())).findAny();
+        if (!containDefaultWorkerGroups.isPresent() || !containDefaultWorkerGroups.get()) {
+            // there doesn't exist a default WorkerGroup, we will add all worker to the default worker group.
+            Set<String> activeWorkerNodes = registryClient.getServerNodeSet(NodeType.WORKER);
+            WorkerGroup defaultWorkerGroup = new WorkerGroup();
+            defaultWorkerGroup.setName(Constants.DEFAULT_WORKER_GROUP);
+            defaultWorkerGroup.setAddrList(String.join(Constants.COMMA, activeWorkerNodes));
+            defaultWorkerGroup.setCreateTime(new Date());
+            defaultWorkerGroup.setUpdateTime(new Date());
+            defaultWorkerGroup.setSystemDefault(true);
+            workerGroups.add(defaultWorkerGroup);
         }
 
-        if (CollectionUtils.isEmpty(workerGroupList)) {
-            if (CollectionUtils.isEmpty(workerGroups) && !isPaging) {
-                WorkerGroup wg = new WorkerGroup();
-                wg.setName(Constants.DEFAULT_WORKER_GROUP);
-                workerGroups.add(wg);
-            }
-            return workerGroups;
-        }
-        Map<String, WorkerGroup> workerGroupsMap = null;
-        if (workerGroups.size() != 0) {
-            workerGroupsMap = workerGroups.stream().collect(Collectors.toMap(WorkerGroup::getName,
-                    workerGroupItem -> workerGroupItem, (oldWorkerGroup, newWorkerGroup) -> oldWorkerGroup));
-        }
-        for (String workerGroup : workerGroupList) {
-            String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup;
-            Collection<String> childrenNodes = null;
-            try {
-                childrenNodes = registryClient.getChildrenKeys(workerGroupPath);
-            } catch (Exception e) {
-                logger.error("getChildrenNodes exception: {}, workerGroupPath: {}", e.getMessage(), workerGroupPath);
-            }
-            if (childrenNodes == null || childrenNodes.isEmpty()) {
-                continue;
-            }
-            WorkerGroup wg = new WorkerGroup();
-            handleAddrList(wg, workerGroup, childrenNodes);
-            wg.setName(workerGroup);
-            if (isPaging) {
-                String registeredValue =
-                        registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next());
-                WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(registeredValue, WorkerHeartBeat.class);
-                wg.setCreateTime(new Date(workerHeartBeat.getStartupTime()));
-                wg.setUpdateTime(new Date(workerHeartBeat.getReportTime()));
-                wg.setSystemDefault(true);
-                if (workerGroupsMap != null && workerGroupsMap.containsKey(workerGroup)) {
-                    wg.setDescription(workerGroupsMap.get(workerGroup).getDescription());
-                    workerGroups.remove(workerGroupsMap.get(workerGroup));
-                }
-            }
-            workerGroups.add(wg);
-        }
         return workerGroups;
     }
 
-    protected void handleAddrList(WorkerGroup wg, String workerGroup, Collection<String> childrenNodes) {
-        wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
-    }
-
     /**
      * delete worker group by id
      *
@@ -368,6 +320,11 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
         List<ProcessInstance> processInstances = processInstanceMapper
                 .queryByWorkerGroupNameAndStatus(workerGroup.getName(), Constants.NOT_TERMINATED_STATES);
         if (CollectionUtils.isNotEmpty(processInstances)) {
+            List<Integer> processInstanceIds =
+                    processInstances.stream().map(ProcessInstance::getId).collect(Collectors.toList());
+            logger.warn(
+                    "Delete worker group failed because there are {} processInstances are using it, processInstanceIds:{}.",
+                    processInstances.size(), processInstanceIds);
             putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size());
             return result;
         }
@@ -385,7 +342,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
     @Override
     public Map<String, Object> getWorkerAddressList() {
         Map<String, Object> result = new HashMap<>();
-        Set<String> serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER, true);
+        Set<String> serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER);
         result.put(Constants.DATA_LIST, serverNodeList);
         putMsg(result, Status.SUCCESS);
         return result;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
index 72bad1e686..afff430388 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
@@ -67,7 +67,7 @@ public class WorkerGroupControllerTest extends AbstractControllerTest {
         Map<String, String> serverMaps = new HashMap<>();
         serverMaps.put("192.168.0.1", "192.168.0.1");
         serverMaps.put("192.168.0.2", "192.168.0.2");
-        PowerMockito.when(registryClient.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps);
+        Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps);
 
         MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
         paramsMap.add("name","cxc_work_group");
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 75453c8649..e97738cbdf 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -17,10 +17,10 @@
 
 package org.apache.dolphinscheduler.server.master.registry;
 
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
 
-import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
@@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
@@ -55,6 +56,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 import javax.annotation.PreDestroy;
 
@@ -87,16 +89,6 @@ public class ServerNodeManager implements InitializingBean {
      */
     private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
 
-    /**
-     * worker group nodes from registry center, workerGroup -> ips
-     */
-    private final ConcurrentHashMap<String, Set<String>> registryWorkerGroupNodes = new ConcurrentHashMap<>();
-
-    /**
-     * worker group nodes from db, workerGroup -> ips
-     */
-    private final ConcurrentHashMap<String, Set<String>> dbWorkerGroupNodes = new ConcurrentHashMap<>();
-
     /**
      * master nodes
      */
@@ -112,28 +104,14 @@ public class ServerNodeManager implements InitializingBean {
     @Autowired
     private RegistryClient registryClient;
 
-    /**
-     * eg : /dolphinscheduler/node/worker/group/127.0.0.1:xxx
-     */
-    private static final int WORKER_LISTENER_CHECK_LENGTH = 5;
-
-    /**
-     * worker group mapper
-     */
     @Autowired
     private WorkerGroupMapper workerGroupMapper;
 
     private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
 
-    /**
-     * alert dao
-     */
     @Autowired
     private AlertDao alertDao;
 
-    /**
-     * master config
-     */
     @Autowired
     private MasterConfig masterConfig;
 
@@ -180,13 +158,8 @@ public class ServerNodeManager implements InitializingBean {
     public void load() {
         // master nodes from zookeeper
         updateMasterNodes();
-
-        // worker group nodes from zookeeper
-        Collection<String> workerGroups = registryClient.getWorkerGroupDirectly();
-        for (String workerGroup : workerGroups) {
-            syncWorkerGroupNodesFromRegistry(workerGroup,
-                    registryClient.getWorkerGroupNodesDirectly(workerGroup), Type.ADD);
-        }
+        updateWorkerNodes();
+        updateWorkerGroupMappings();
     }
 
     /**
@@ -197,42 +170,19 @@ public class ServerNodeManager implements InitializingBean {
         @Override
         public void run() {
             try {
-                dbWorkerGroupNodes.clear();
 
                 // sync worker node info
-                Map<String, String> registryWorkerNodeMap = registryClient
-                        .getServerMaps(NodeType.WORKER, true);
-                syncAllWorkerNodeInfo(registryWorkerNodeMap);
-                // sync worker group nodes from database
-                List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
-                if (CollectionUtils.isNotEmpty(workerGroupList)) {
-                    for (WorkerGroup wg : workerGroupList) {
-                        String workerGroupName = wg.getName();
-                        Set<String> workerAddress = getWorkerAddressByWorkerGroup(
-                                registryWorkerNodeMap, wg);
-                        if (!workerAddress.isEmpty()) {
-                            Set<String> workerNodes = dbWorkerGroupNodes
-                                    .getOrDefault(workerGroupName, new HashSet<>());
-                            workerNodes.clear();
-                            workerNodes.addAll(workerAddress);
-                            dbWorkerGroupNodes.put(workerGroupName, workerNodes);
-                        }
-                    }
-                }
+                updateWorkerNodes();
+                updateWorkerGroupMappings();
+                notifyWorkerInfoChangeListeners();
             } catch (Exception e) {
                 logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
-            } finally {
-                refreshWorkerGroupNodes();
             }
         }
     }
 
-<<<<<<< HEAD
     protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo,
                                                         WorkerGroup wg) {
-=======
-    protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) {
->>>>>>> 2e61c76c2 ([Improvement] Add remote task model (#11767))
         Set<String> nodes = new HashSet<>();
         String[] addrs = wg.getAddrList().split(Constants.COMMA);
         for (String addr : addrs) {
@@ -256,30 +206,18 @@ public class ServerNodeManager implements InitializingBean {
             if (registryClient.isWorkerPath(path)) {
                 try {
                     String[] parts = path.split("/");
-                    if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
-                        throw new IllegalArgumentException(
-                                String.format("worker group path : %s is not valid, ignore", path));
-                    }
-                    final String workerGroupName = parts[parts.length - 2];
                     final String workerAddress = parts[parts.length - 1];
 
+                    // todo: update workerNodeInfo
                     logger.debug("received subscribe event : {}", event);
-                    Collection<String> currentNodes = registryClient
-                            .getWorkerGroupNodesDirectly(workerGroupName);
-                    syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type);
-
                     if (type == Type.ADD) {
-                        logger.info("worker group node : {} added, currentNodes : {}", path,
-                                currentNodes);
+                        logger.info("Worker: {} added, currentNode : {}", path, workerAddress);
                     } else if (type == Type.REMOVE) {
-                        logger.info("worker group node : {} down.", path);
+                        logger.info("Worker node : {} down.", path);
                         alertDao.sendServerStoppedAlert(1, path, "WORKER");
                     } else if (type == Type.UPDATE) {
-                        syncSingleWorkerNodeInfo(workerAddress,
-                                JSONUtils.parseObject(data, WorkerHeartBeat.class));
+                        syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
                     }
-                } catch (IllegalArgumentException ex) {
-                    logger.warn(ex.getMessage());
                 } catch (Exception ex) {
                     logger.error("WorkerGroupListener capture data change and get data failed", ex);
                 }
@@ -330,6 +268,50 @@ public class ServerNodeManager implements InitializingBean {
 
     }
 
+    private void updateWorkerNodes() {
+        workerGroupWriteLock.lock();
+        try {
+            Map<String, String> workerNodeMaps = registryClient.getServerMaps(NodeType.WORKER);
+            for (Map.Entry<String, String> entry : workerNodeMaps.entrySet()) {
+                workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
+            }
+        } finally {
+            workerGroupWriteLock.unlock();
+        }
+    }
+
+    private void updateWorkerGroupMappings() {
+        List<WorkerGroup> workerGroups = workerGroupMapper.queryAllWorkerGroup();
+        Map<String, Set<String>> tmpWorkerGroupMappings = new HashMap<>();
+        try {
+            workerNodeInfoReadLock.lock();
+            for (WorkerGroup workerGroup : workerGroups) {
+                String workerGroupName = workerGroup.getName();
+                String[] workerAddresses = workerGroup.getAddrList().split(Constants.COMMA);
+                if (ArrayUtils.isEmpty(workerAddresses)) {
+                    continue;
+                }
+                Set<String> activeWorkerNodes = Arrays.stream(workerAddresses)
+                        .filter(workerNodeInfo::containsKey).collect(Collectors.toSet());
+                tmpWorkerGroupMappings.put(workerGroupName, activeWorkerNodes);
+            }
+            if (!tmpWorkerGroupMappings.containsKey(Constants.DEFAULT_WORKER_GROUP)) {
+                tmpWorkerGroupMappings.put(Constants.DEFAULT_WORKER_GROUP, workerNodeInfo.keySet());
+            }
+        } finally {
+            workerNodeInfoReadLock.unlock();
+        }
+
+        workerGroupWriteLock.lock();
+        try {
+            workerGroupNodes.clear();
+            workerGroupNodes.putAll(tmpWorkerGroupMappings);
+            notifyWorkerInfoChangeListeners();
+        } finally {
+            workerGroupWriteLock.unlock();
+        }
+    }
+
     /**
      * sync master nodes
      *
@@ -349,63 +331,13 @@ public class ServerNodeManager implements InitializingBean {
                 logger.warn("current addr:{} is not in active master list",
                         masterConfig.getMasterAddress());
             }
-<<<<<<< HEAD
             logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE,
                     MASTER_SLOT, masterConfig.getMasterAddress());
-=======
-            logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT,
-                    masterConfig.getMasterAddress());
->>>>>>> 2e61c76c2 ([Improvement] Add remote task model (#11767))
         } finally {
             masterLock.unlock();
         }
     }
 
-    /**
-     * sync worker group nodes from registry center
-     *
-     * @param workerGroup worker group
-     * @param nodes worker nodes
-     * @param type event type
-     */
-    private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection<String> nodes,
-                                                  Type type) {
-        try {
-            if (type == Type.REMOVE) {
-                if (!registryWorkerGroupNodes.containsKey(workerGroup)) {
-                    logger.warn("cannot remove worker group {}, not in active list", workerGroup);
-                    return;
-                }
-                registryWorkerGroupNodes.remove(workerGroup);
-            } else {
-                Set<String> workerNodes = registryWorkerGroupNodes
-                        .getOrDefault(workerGroup, new HashSet<>());
-                workerNodes.clear();
-                workerNodes.addAll(nodes);
-                registryWorkerGroupNodes.put(workerGroup, workerNodes);
-            }
-        } finally {
-            refreshWorkerGroupNodes();
-        }
-    }
-
-    /**
-     * refresh worker group nodes
-     */
-    private void refreshWorkerGroupNodes() {
-        workerGroupWriteLock.lock();
-        try {
-            workerGroupNodes.clear();
-            workerGroupNodes.putAll(registryWorkerGroupNodes);
-            workerGroupNodes.putAll(dbWorkerGroupNodes);
-            logger.debug("refresh worker group nodes, current list: {}",
-                    Arrays.toString(workerGroupNodes.keySet().toArray()));
-        } finally {
-            notifyWorkerInfoChangeListeners();
-            workerGroupWriteLock.unlock();
-        }
-    }
-
     public Map<String, Set<String>> getWorkerGroupNodes() {
         workerGroupReadLock.lock();
         try {
@@ -466,8 +398,7 @@ public class ServerNodeManager implements InitializingBean {
         try {
             workerNodeInfo.clear();
             for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
-                workerNodeInfo.put(entry.getKey(),
-                        JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
+                workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
             }
         } finally {
             workerNodeInfoWriteLock.unlock();
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index 508ba293aa..2a5b2203f2 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -17,9 +17,12 @@
 
 package org.apache.dolphinscheduler.service.registry;
 
-import com.google.common.base.Strings;
-import lombok.NonNull;
-import org.apache.commons.lang3.StringUtils;
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.dolphinscheduler.common.Constants.COLON;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
@@ -31,28 +34,29 @@ import org.apache.dolphinscheduler.registry.api.ConnectionListener;
 import org.apache.dolphinscheduler.registry.api.Registry;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.registry.api.SubscribeListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
 
-import javax.annotation.PostConstruct;
+import org.apache.commons.lang3.StringUtils;
+
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.dolphinscheduler.common.Constants.COLON;
-import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import javax.annotation.PostConstruct;
+
+import lombok.NonNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Strings;
 
 @Component
 public class RegistryClient {
@@ -91,7 +95,7 @@ public class RegistryClient {
     }
 
     public List<Server> getServerList(NodeType nodeType) {
-        Map<String, String> serverMaps = getServerMaps(nodeType, false);
+        Map<String, String> serverMaps = getServerMaps(nodeType);
         String parentPath = rootNodePath(nodeType);
 
         List<Server> serverList = new ArrayList<>();
@@ -123,26 +127,24 @@ public class RegistryClient {
             server.setZkDirectory(parentPath + "/" + serverPath);
             // set host and port
             String[] hostAndPort = serverPath.split(COLON);
-            String[] hosts = hostAndPort[0].split(DIVISION_STRING);
             // fetch the last one
-            server.setHost(hosts[hosts.length - 1]);
+            server.setHost(hostAndPort[0]);
             server.setPort(Integer.parseInt(hostAndPort[1]));
             serverList.add(server);
         }
         return serverList;
     }
 
-    public Map<String, String> getServerMaps(NodeType nodeType, boolean hostOnly) {
+    /**
+     * Return server host:port -> value
+     */
+    public Map<String, String> getServerMaps(NodeType nodeType) {
         Map<String, String> serverMap = new HashMap<>();
         try {
             String path = rootNodePath(nodeType);
             Collection<String> serverList = getServerNodes(nodeType);
             for (String server : serverList) {
-                String host = server;
-                if (nodeType == NodeType.WORKER && hostOnly) {
-                    host = server.split(SINGLE_SLASH)[1];
-                }
-                serverMap.putIfAbsent(host, get(path + SINGLE_SLASH + server));
+                serverMap.putIfAbsent(server, get(path + SINGLE_SLASH + server));
             }
         } catch (Exception e) {
             logger.error("get server list failed", e);
@@ -152,7 +154,7 @@ public class RegistryClient {
     }
 
     public boolean checkNodeExists(String host, NodeType nodeType) {
-        return getServerMaps(nodeType, true).keySet()
+        return getServerMaps(nodeType).keySet()
                 .stream()
                 .anyMatch(it -> it.contains(host));
     }
@@ -161,14 +163,6 @@ public class RegistryClient {
         return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
     }
 
-    public Collection<String> getWorkerGroupDirectly() {
-        return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
-    }
-
-    public Collection<String> getWorkerGroupNodesDirectly(String workerGroup) {
-        return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup);
-    }
-
     /**
      * get host ip:port, path format: parentPath/ip:port
      *
@@ -241,14 +235,9 @@ public class RegistryClient {
         return registry.children(key);
     }
 
-    public Set<String> getServerNodeSet(NodeType nodeType, boolean hostOnly) {
+    public Set<String> getServerNodeSet(NodeType nodeType) {
         try {
-            return getServerNodes(nodeType).stream().map(server -> {
-                if (nodeType == NodeType.WORKER && hostOnly) {
-                    return server.split(SINGLE_SLASH)[1];
-                }
-                return server;
-            }).collect(Collectors.toSet());
+            return new HashSet<>(getServerNodes(nodeType));
         } catch (Exception e) {
             throw new RegistryException("Failed to get server node: " + nodeType, e);
         }
@@ -272,13 +261,7 @@ public class RegistryClient {
 
     private Collection<String> getServerNodes(NodeType nodeType) {
         final String path = rootNodePath(nodeType);
-        final Collection<String> serverList = getChildrenKeys(path);
-        if (nodeType != NodeType.WORKER) {
-            return serverList;
-        }
-        return serverList.stream().flatMap(group -> getChildrenKeys(path + SINGLE_SLASH + group)
-                .stream()
-                .map(it -> group + SINGLE_SLASH + it)).collect(Collectors.toList());
+        return getChildrenKeys(path);
     }
 
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index a7e6927415..96f00a1f30 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -17,11 +17,15 @@
 
 package org.apache.dolphinscheduler.server.worker.config;
 
-import com.google.common.collect.Sets;
-import lombok.Data;
-import org.apache.commons.collections4.CollectionUtils;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
+
+import java.time.Duration;
+
+import lombok.Data;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -30,12 +34,6 @@ import org.springframework.validation.Errors;
 import org.springframework.validation.Validator;
 import org.springframework.validation.annotation.Validated;
 
-import java.time.Duration;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-
 @Data
 @Validated
 @Configuration
@@ -52,7 +50,6 @@ public class WorkerConfig implements Validator {
     private boolean tenantDistributedUser = false;
     private int maxCpuLoadAvg = -1;
     private double reservedMemory = 0.3;
-    private Set<String> groups = Sets.newHashSet("default");
     private String alertListenHost = "localhost";
     private int alertListenPort = 50052;
     private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
@@ -61,7 +58,7 @@ public class WorkerConfig implements Validator {
      * This field doesn't need to set at config file, it will be calculated by workerIp:listenPort
      */
     private String workerAddress;
-    private Set<String> workerGroupRegistryPaths;
+    private String workerRegistryPath;
 
     @Override
     public boolean supports(Class<?> clazz) {
@@ -82,17 +79,7 @@ public class WorkerConfig implements Validator {
         }
         workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
 
-        workerConfig.setGroups(workerConfig.getGroups().stream().map(String::trim).collect(Collectors.toSet()));
-        if (CollectionUtils.isEmpty(workerConfig.getGroups())) {
-            errors.rejectValue("groups", null, "should not be empty");
-        }
-
-        Set<String> workerRegistryPaths = workerConfig.getGroups()
-                .stream()
-                .map(workerGroup -> REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup + "/" + workerConfig.getWorkerAddress())
-                .collect(Collectors.toSet());
-
-        workerConfig.setWorkerGroupRegistryPaths(workerRegistryPaths);
+        workerConfig.setWorkerRegistryPath(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerConfig.getWorkerAddress());
         printConfig();
     }
 
@@ -105,11 +92,10 @@ public class WorkerConfig implements Validator {
         logger.info("Worker config: tenantDistributedUser -> {}", tenantDistributedUser);
         logger.info("Worker config: maxCpuLoadAvg -> {}", maxCpuLoadAvg);
         logger.info("Worker config: reservedMemory -> {}", reservedMemory);
-        logger.info("Worker config: groups -> {}", groups);
         logger.info("Worker config: alertListenHost -> {}", alertListenHost);
         logger.info("Worker config: alertListenPort -> {}", alertListenPort);
         logger.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy);
         logger.info("Worker config: workerAddress -> {}", registryDisconnectStrategy);
-        logger.info("Worker config: workerGroupRegistryPaths: {}", workerGroupRegistryPaths);
+        logger.info("Worker config: workerRegistryPath: {}", workerRegistryPath);
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index d2147d1559..169db04a82 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -17,26 +17,28 @@
 
 package org.apache.dolphinscheduler.server.worker.registry;
 
-import lombok.extern.slf4j.Slf4j;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.registry.api.RegistryException;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
 
-import javax.annotation.PostConstruct;
 import java.io.IOException;
 
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+import javax.annotation.PostConstruct;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 @Slf4j
 @Service
@@ -56,7 +58,6 @@ public class WorkerRegistryClient implements AutoCloseable {
 
     private WorkerHeartBeatTask workerHeartBeatTask;
 
-
     @PostConstruct
     public void initWorkRegistry() {
         this.workerHeartBeatTask = new WorkerHeartBeatTask(
@@ -80,22 +81,19 @@ public class WorkerRegistryClient implements AutoCloseable {
      */
     private void registry() {
         WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat();
+        String workerZKPath = workerConfig.getWorkerRegistryPath();
+        // remove before persist
+        registryClient.remove(workerZKPath);
+        registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat));
+        log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath);
 
-        for (String workerZKPath : workerConfig.getWorkerGroupRegistryPaths()) {
-            // remove before persist
-            registryClient.remove(workerZKPath);
-            registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat));
-            log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath);
-        }
-
-        while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.WORKER)) {
+        while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), NodeType.WORKER)) {
             ThreadUtils.sleep(SLEEP_TIME_MILLIS);
         }
 
         // sleep 1s, waiting master failover remove
         ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
 
-
         workerHeartBeatTask.start();
         log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress());
     }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index d2a9ca008c..bf81bbb945 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -17,8 +17,6 @@
 
 package org.apache.dolphinscheduler.server.worker.task;
 
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
@@ -30,6 +28,9 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.util.function.Supplier;
 
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
 @Slf4j
 public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
 
@@ -59,7 +60,8 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
         double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
         int execThreads = workerConfig.getExecThreads();
         int workerWaitingTaskCount = this.workerWaitingTaskCount.get();
-        int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory, execThreads, workerWaitingTaskCount);
+        int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory,
+                execThreads, workerWaitingTaskCount);
 
         return WorkerHeartBeat.builder()
                 .startupTime(ServerLifeCycleManager.getServerStartupTime())
@@ -82,11 +84,11 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
     @Override
     public void writeHeartBeat(WorkerHeartBeat workerHeartBeat) {
         String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat);
-        for (String workerGroupRegistryPath : workerConfig.getWorkerGroupRegistryPaths()) {
-            registryClient.persistEphemeral(workerGroupRegistryPath, workerHeartBeatJson);
-        }
-        log.info("Success write worker group heartBeatInfo into registry, workGroupPath: {} workerHeartBeatInfo: {}",
-                workerConfig.getWorkerGroupRegistryPaths(), workerHeartBeatJson);
+        String workerRegistryPath = workerConfig.getWorkerRegistryPath();
+        registryClient.persistEphemeral(workerRegistryPath, workerHeartBeatJson);
+        log.info(
+                "Success write worker group heartBeatInfo into registry, workerRegistryPath: {} workerHeartBeatInfo: {}",
+                workerRegistryPath, workerHeartBeatJson);
     }
 
     public int getServerStatus(double loadAverage,
@@ -96,11 +98,13 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
                                int workerExecThreadCount,
                                int workerWaitingTaskCount) {
         if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
-            log.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
+            log.warn(
+                    "current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
                     loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
             return Constants.ABNORMAL_NODE_STATUS;
         } else if (workerWaitingTaskCount > workerExecThreadCount) {
-            log.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount);
+            log.warn("current waiting task count {} is large than worker thread count {}, worker is busy",
+                    workerWaitingTaskCount, workerExecThreadCount);
             return Constants.BUSY_NODE_STATUE;
         } else {
             return Constants.NORMAL_NODE_STATUS;
diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml
index 4effad23b1..d64f4fde2c 100644
--- a/dolphinscheduler-worker/src/main/resources/application.yaml
+++ b/dolphinscheduler-worker/src/main/resources/application.yaml
@@ -68,13 +68,6 @@ worker:
   max-cpu-load-avg: -1
   # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
   reserved-memory: 0.3
-  # for multiple worker groups, use hyphen before group name, e.g.
-  # groups:
-  #   - default
-  #   - group1
-  #   - group2
-  groups:
-    - default
   # alert server listen host
   alert-listen-host: localhost
   alert-listen-port: 50052
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
index 1d8e7ea516..f39b8a9ad7 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.registry;
 import static org.mockito.BDDMockito.given;
 
 import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -27,15 +28,14 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
 import java.time.Duration;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +44,7 @@ import com.google.common.collect.Sets;
 /**
  * worker registry test
  */
-@RunWith(MockitoJUnitRunner.Silent.class)
+@ExtendWith(MockitoExtension.class)
 public class WorkerRegistryClientTest {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
@@ -78,28 +78,17 @@ public class WorkerRegistryClientTest {
         // workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP);
     }
 
-    @Before
-    public void before() {
-        given(workerConfig.getGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
-        // given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
-        // scheduleAtFixedRate
-        given(heartBeatExecutor.scheduleAtFixedRate(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(),
-                Mockito.any(TimeUnit.class))).willReturn(null);
-
-    }
-
     @Test
     public void testStart() {
-        workerRegistryClient.initWorkRegistry();
-
-        given(workerManagerThread.getThreadPoolQueueSize()).willReturn(1);
-
-        given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true);
 
+        given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234));
         given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
+        given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true);
 
+        workerRegistryClient.initWorkRegistry();
         workerRegistryClient.start();
 
+        Assertions.assertTrue(true);
     }
 
     @Test