You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/11/07 07:18:14 UTC

[dolphinscheduler] branch dev updated: Refactor registry plugin and simplify its usage (#6712)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5400abc  Refactor registry plugin and simplify its usage (#6712)
5400abc is described below

commit 5400abc74fdb16717d22c15f964cad1a6dfd899e
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Sun Nov 7 15:18:10 2021 +0800

    Refactor registry plugin and simplify its usage (#6712)
---
 .../registry.properties => .github/CODEOWNERS      |   8 +-
 .../conf/dolphinscheduler/registry.properties.tpl  |   8 -
 docker/build/startup-init-conf.sh                  |   1 -
 docker/docker-swarm/config.env.sh                  |   1 -
 docker/docker-swarm/docker-compose.yml             |   2 +-
 .../dolphinscheduler/templates/_helpers.tpl        |   8 +-
 .../api/service/impl/MonitorServiceImpl.java       |  75 ++--
 .../api/service/impl/WorkerGroupServiceImpl.java   |  29 +-
 .../api/utils/RegistryCenterUtils.java             |  82 ----
 .../api/controller/AbstractControllerTest.java     |  17 +-
 .../api/controller/SchedulerControllerTest.java    |   3 -
 .../api/controller/TaskInstanceControllerTest.java |   3 -
 .../api/controller/TenantControllerTest.java       |   3 -
 .../api/controller/WorkerGroupControllerTest.java  |   7 +-
 .../api/service/WorkerGroupServiceTest.java        |  96 +----
 .../api/utils/RegistryCenterUtilsTest.java         |  43 --
 .../apache/dolphinscheduler/common/Constants.java  |   2 -
 .../dolphinscheduler/common/enums/NodeType.java    |  11 +-
 .../src/main/provisio/dolphinscheduler.xml         |   5 -
 .../ZookeeperConnectionStateListener.java          |  56 ---
 .../zookeeper/ZookeeperRegistryPlugin.java         |  34 --
 .../dolphinscheduler-registry-api/pom.xml          |  32 ++
 .../registry/api/ConnectionListener.java           |  25 ++
 .../registry/api/ConnectionState.java              |  27 ++
 .../dolphinscheduler/registry/api/Event.java       |  48 +++
 .../dolphinscheduler/registry/api/Registry.java    |  48 +++
 .../registry/api/RegistryException.java            |  31 ++
 .../registry/api/RegistryFactory.java              |  26 ++
 .../registry/api/RegistryFactoryLoader.java        |  35 ++
 .../registry/api/SubscribeListener.java            |  24 ++
 .../dolphinscheduler-registry-zookeeper/pom.xml    |  18 +-
 .../registry/zookeeper/ZookeeperConfiguration.java |   0
 .../ZookeeperConnectionStateListener.java          |  54 +++
 .../registry/zookeeper/ZookeeperRegistry.java      | 229 ++++------
 .../zookeeper/ZookeeperRegistryFactory.java        |  14 +-
 .../registry/zookeeper/ZookeeperRegistryTest.java  |  23 +-
 .../dolphinscheduler-registry-plugins/pom.xml      |  36 ++
 .../pom.xml                                        |  25 +-
 .../master/registry/MasterRegistryClient.java      |  73 ++--
 .../registry/MasterRegistryDataListener.java       |  40 +-
 .../server/master/registry/ServerNodeManager.java  |  58 +--
 .../server/monitor/RegistryMonitorImpl.java        |  22 +-
 .../server/registry/HeartBeatTask.java             |  10 +-
 .../worker/processor/TaskCallbackService.java      |   6 -
 .../worker/registry/WorkerRegistryClient.java      |   7 +-
 .../src/main/resources/config/install_config.conf  |   5 -
 .../master/dispatch/ExecutorDispatcherTest.java    |   2 +-
 .../master/registry/MasterRegistryClientTest.java  |  16 +-
 dolphinscheduler-service/pom.xml                   |   4 +
 .../service/registry/RegistryCenter.java           | 243 -----------
 .../service/registry/RegistryClient.java           | 467 ++++++++-------------
 .../src/main/resources/registry.properties         |  13 -
 .../service/registry/RegistryClientTest.java       |  74 ----
 .../service/registry/RegistryPluginTest.java       |  45 --
 .../spi/DolphinSchedulerPlugin.java                |   9 -
 .../spi/register/ConnectStateListener.java         |  23 -
 .../spi/register/DataChangeEvent.java              |  37 --
 .../spi/register/ListenerManager.java              |  66 ---
 .../dolphinscheduler/spi/register/Registry.java    | 102 -----
 .../spi/register/RegistryConnectListener.java      |  23 -
 .../spi/register/RegistryConnectState.java         |  37 --
 .../spi/register/RegistryException.java            |  32 --
 .../spi/register/RegistryFactory.java              |  34 --
 .../spi/register/RegistryPluginManager.java        |  82 ----
 .../spi/register/SubscribeListener.java            |  30 --
 .../dolphinscheduler/server/StandaloneServer.java  |   9 -
 pom.xml                                            |  23 +-
 script/env/dolphinscheduler_env.sh                 |   2 +-
 68 files changed, 882 insertions(+), 1901 deletions(-)

diff --git a/dolphinscheduler-standalone-server/src/main/resources/registry.properties b/.github/CODEOWNERS
similarity index 74%
rename from dolphinscheduler-standalone-server/src/main/resources/registry.properties
rename to .github/CODEOWNERS
index 3c33799..9def4be 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/registry.properties
+++ b/.github/CODEOWNERS
@@ -15,8 +15,6 @@
 # limitations under the License.
 #
 
-# This file is only to override the production configurations in standalone server.
-
-registry.plugin.dir=./dolphinscheduler-dist/target/dolphinscheduler-dist-2.0.0-SNAPSHOT/lib/plugin/registry/zookeeper
-registry.plugin.name=zookeeper
-registry.servers=127.0.0.1:2181
+dolphinscheduler/dolphinscheduler-e2e @kezhenxu94
+dolphinscheduler/dolphinscheduler-registry @kezhenxu94
+dolphinscheduler/dolphinscheduler-standalone-server @kezhenxu94
diff --git a/docker/build/conf/dolphinscheduler/registry.properties.tpl b/docker/build/conf/dolphinscheduler/registry.properties.tpl
index 40836f5..9ee8add 100644
--- a/docker/build/conf/dolphinscheduler/registry.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/registry.properties.tpl
@@ -15,13 +15,5 @@
 # limitations under the License.
 #
 
-#registry.plugin.dir config the Registry Plugin dir.
-registry.plugin.dir=${REGISTRY_PLUGIN_DIR}
-
 registry.plugin.name=${REGISTRY_PLUGIN_NAME}
 registry.servers=${REGISTRY_SERVERS}
-
-#maven.local.repository=/usr/local/localRepository
-
-#registry.plugin.binding config the Registry Plugin need be load when development and run in IDE
-#registry.plugin.binding=./dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml
diff --git a/docker/build/startup-init-conf.sh b/docker/build/startup-init-conf.sh
index 35f70bb..4717834 100755
--- a/docker/build/startup-init-conf.sh
+++ b/docker/build/startup-init-conf.sh
@@ -37,7 +37,6 @@ export DATABASE_PARAMS=${DATABASE_PARAMS:-"characterEncoding=utf8"}
 #============================================================================
 # Registry
 #============================================================================
-export REGISTRY_PLUGIN_DIR=${REGISTRY_PLUGIN_DIR:-"lib/plugin/registry"}
 export REGISTRY_PLUGIN_NAME=${REGISTRY_PLUGIN_NAME:-"zookeeper"}
 export REGISTRY_SERVERS=${REGISTRY_SERVERS:-"127.0.0.1:2181"}
 
diff --git a/docker/docker-swarm/config.env.sh b/docker/docker-swarm/config.env.sh
index afc09b0..0c79a04 100755
--- a/docker/docker-swarm/config.env.sh
+++ b/docker/docker-swarm/config.env.sh
@@ -39,7 +39,6 @@ DATABASE_PARAMS=characterEncoding=utf8
 #============================================================================
 # Registry
 #============================================================================
-REGISTRY_PLUGIN_DIR=lib/plugin/registry
 REGISTRY_PLUGIN_NAME=zookeeper
 REGISTRY_SERVERS=dolphinscheduler-zookeeper:2181
 
diff --git a/docker/docker-swarm/docker-compose.yml b/docker/docker-swarm/docker-compose.yml
index 3f63e79..2f67ae8 100644
--- a/docker/docker-swarm/docker-compose.yml
+++ b/docker/docker-swarm/docker-compose.yml
@@ -140,4 +140,4 @@ volumes:
   dolphinscheduler-worker-data:
   dolphinscheduler-logs:
   dolphinscheduler-shared-local:
-  dolphinscheduler-resource-local:
\ No newline at end of file
+  dolphinscheduler-resource-local:
diff --git a/docker/kubernetes/dolphinscheduler/templates/_helpers.tpl b/docker/kubernetes/dolphinscheduler/templates/_helpers.tpl
index 2b0786f..5ef83a7 100644
--- a/docker/kubernetes/dolphinscheduler/templates/_helpers.tpl
+++ b/docker/kubernetes/dolphinscheduler/templates/_helpers.tpl
@@ -166,12 +166,6 @@ Create a database environment variables.
 Create a registry environment variables.
 */}}
 {{- define "dolphinscheduler.registry.env_vars" -}}
-- name: REGISTRY_PLUGIN_DIR
-  {{- if .Values.zookeeper.enabled }}
-  value: "lib/plugin/registry"
-  {{- else }}
-  value: {{ .Values.externalRegistry.registryPluginDir }}
-  {{- end }}
 - name: REGISTRY_PLUGIN_NAME
   {{- if .Values.zookeeper.enabled }}
   value: "zookeeper"
@@ -239,4 +233,4 @@ Create a fsFileResourcePersistence volumeMount.
 - mountPath: {{ default "/dolphinscheduler" .Values.common.configmap.RESOURCE_UPLOAD_PATH | quote }}
   name: {{ include "dolphinscheduler.fullname" . }}-fs-file
 {{- end -}}
-{{- end -}}
\ No newline at end of file
+{{- end -}}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java
index cb3b0b2..ccec49b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java
@@ -19,14 +19,14 @@ package org.apache.dolphinscheduler.api.service.impl;
 
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.MonitorService;
-import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.model.WorkerServerModel;
 import org.apache.dolphinscheduler.dao.MonitorDBDao;
 import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.util.HashMap;
 import java.util.List;
@@ -48,6 +48,9 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
     @Autowired
     private MonitorDBDao monitorDBDao;
 
+    @Autowired
+    private RegistryClient registryClient;
+
     /**
      * query database state
      *
@@ -55,7 +58,7 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
      * @return data base state
      */
     @Override
-    public Map<String,Object> queryDatabaseState(User loginUser) {
+    public Map<String, Object> queryDatabaseState(User loginUser) {
         Map<String, Object> result = new HashMap<>();
 
         List<MonitorRecord> monitorRecordList = monitorDBDao.queryDatabaseState();
@@ -74,13 +77,13 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
      * @return master information list
      */
     @Override
-    public Map<String,Object> queryMaster(User loginUser) {
+    public Map<String, Object> queryMaster(User loginUser) {
 
         Map<String, Object> result = new HashMap<>();
 
         List<Server> masterServers = getServerListFromRegistry(true);
         result.put(Constants.DATA_LIST, masterServers);
-        putMsg(result,Status.SUCCESS);
+        putMsg(result, Status.SUCCESS);
 
         return result;
     }
@@ -92,12 +95,10 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
      * @return zookeeper information list
      */
     @Override
-    public Map<String,Object> queryZookeeperState(User loginUser) {
+    public Map<String, Object> queryZookeeperState(User loginUser) {
         Map<String, Object> result = new HashMap<>();
 
-        List<ZookeeperRecord> zookeeperRecordList = RegistryCenterUtils.zookeeperInfoList();
-
-        result.put(Constants.DATA_LIST, zookeeperRecordList);
+        result.put(Constants.DATA_LIST, null);
         putMsg(result, Status.SUCCESS);
 
         return result;
@@ -111,46 +112,48 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
      * @return worker information list
      */
     @Override
-    public Map<String,Object> queryWorker(User loginUser) {
+    public Map<String, Object> queryWorker(User loginUser) {
 
         Map<String, Object> result = new HashMap<>();
         List<WorkerServerModel> workerServers = getServerListFromRegistry(false)
-                .stream()
-                .map((Server server) -> {
-                    WorkerServerModel model = new WorkerServerModel();
-                    model.setId(server.getId());
-                    model.setHost(server.getHost());
-                    model.setPort(server.getPort());
-                    model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
-                    model.setResInfo(server.getResInfo());
-                    model.setCreateTime(server.getCreateTime());
-                    model.setLastHeartbeatTime(server.getLastHeartbeatTime());
-                    return model;
-                })
-                .collect(Collectors.toList());
+            .stream()
+            .map((Server server) -> {
+                WorkerServerModel model = new WorkerServerModel();
+                model.setId(server.getId());
+                model.setHost(server.getHost());
+                model.setPort(server.getPort());
+                model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
+                model.setResInfo(server.getResInfo());
+                model.setCreateTime(server.getCreateTime());
+                model.setLastHeartbeatTime(server.getLastHeartbeatTime());
+                return model;
+            })
+            .collect(Collectors.toList());
 
         Map<String, WorkerServerModel> workerHostPortServerMapping = workerServers
-                .stream()
-                .collect(Collectors.toMap(
-                    (WorkerServerModel worker) -> {
-                        String[] s = worker.getZkDirectories().iterator().next().split("/");
-                        return s[s.length - 1];
-                    }
-                    , Function.identity()
-                    , (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
-                        oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
-                        return oldOne;
-                    }));
+            .stream()
+            .collect(Collectors.toMap(
+                (WorkerServerModel worker) -> {
+                    String[] s = worker.getZkDirectories().iterator().next().split("/");
+                    return s[s.length - 1];
+                }
+                , Function.identity()
+                , (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
+                    oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
+                    return oldOne;
+                }));
 
         result.put(Constants.DATA_LIST, workerHostPortServerMapping.values());
-        putMsg(result,Status.SUCCESS);
+        putMsg(result, Status.SUCCESS);
 
         return result;
     }
 
     @Override
     public List<Server> getServerListFromRegistry(boolean isMaster) {
-        return isMaster ? RegistryCenterUtils.getMasterServers() : RegistryCenterUtils.getWorkerServers();
+        return isMaster
+            ? registryClient.getServerList(NodeType.MASTER)
+            : registryClient.getServerList(NodeType.WORKER);
     }
 
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 379af64..06e6958 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.api.service.impl;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.WorkerGroupService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
-import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
@@ -31,14 +30,17 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.lang.StringUtils;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
@@ -58,10 +60,13 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
     private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceImpl.class);
 
     @Autowired
-    WorkerGroupMapper workerGroupMapper;
+    private WorkerGroupMapper workerGroupMapper;
 
     @Autowired
-    ProcessInstanceMapper processInstanceMapper;
+    private ProcessInstanceMapper processInstanceMapper;
+
+    @Autowired
+    private RegistryClient registryClient;
 
     /**
      * create or update a worker group
@@ -139,7 +144,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
         }
         // check zookeeper
         String workerGroupPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH + workerGroup.getName();
-        return RegistryCenterUtils.isNodeExisted(workerGroupPath);
+        return registryClient.exists(workerGroupPath);
     }
 
     /**
@@ -149,7 +154,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
      * @return boolean
      */
     private String checkWorkerGroupAddrList(WorkerGroup workerGroup) {
-        Map<String, String> serverMaps = RegistryCenterUtils.getServerMaps(NodeType.WORKER, true);
+        Map<String, String> serverMaps = registryClient.getServerMaps(NodeType.WORKER, true);
         if (Strings.isNullOrEmpty(workerGroup.getAddrList())) {
             return null;
         }
@@ -250,11 +255,11 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
         List<WorkerGroup> workerGroups = workerGroupMapper.queryAllWorkerGroup();
         // worker groups from zookeeper
         String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-        List<String> workerGroupList = null;
+        Collection<String> workerGroupList = null;
         try {
-            workerGroupList = RegistryCenterUtils.getChildrenNodes(workerPath);
+            workerGroupList = registryClient.getChildrenKeys(workerPath);
         } catch (Exception e) {
-            logger.error("getWorkerGroups exception: {}, workerPath: {}, isPaging: {}", e.getMessage(), workerPath, isPaging);
+            logger.error("getWorkerGroups exception, workerPath: {}, isPaging: {}", workerPath, isPaging, e);
         }
 
         if (CollectionUtils.isEmpty(workerGroupList)) {
@@ -268,9 +273,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
 
         for (String workerGroup : workerGroupList) {
             String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup;
-            List<String> childrenNodes = null;
+            Collection<String> childrenNodes = null;
             try {
-                childrenNodes = RegistryCenterUtils.getChildrenNodes(workerGroupPath);
+                childrenNodes = registryClient.getChildrenKeys(workerGroupPath);
             } catch (Exception e) {
                 logger.error("getChildrenNodes exception: {}, workerGroupPath: {}", e.getMessage(), workerGroupPath);
             }
@@ -281,7 +286,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
             wg.setName(workerGroup);
             if (isPaging) {
                 wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
-                String registeredValue = RegistryCenterUtils.getNodeData(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.get(0));
+                String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next());
                 wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[6]));
                 wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[7]));
                 wg.setSystemDefault(true);
@@ -328,7 +333,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
     @Override
     public Map<String, Object> getWorkerAddressList() {
         Map<String, Object> result = new HashMap<>();
-        List<String> serverNodeList = RegistryCenterUtils.getServerNodeList(NodeType.WORKER, true);
+        Set<String> serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER, true);
         result.put(Constants.DATA_LIST, serverNodeList);
         putMsg(result, Status.SUCCESS);
         return result;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtils.java
deleted file mode 100644
index 71e0456..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtils.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.utils;
-
-import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * monitor zookeeper info todo registry-spi
- * fixme Some of the information obtained in the api belongs to the unique information of zk.
- * I am not sure whether there is a good abstraction method. This is related to whether the specific plug-in is provided.
- */
-public class RegistryCenterUtils {
-
-    private static RegistryClient registryClient = RegistryClient.getInstance();
-
-    /**
-     * @return zookeeper info list
-     */
-    public static List<ZookeeperRecord> zookeeperInfoList() {
-        return null;
-    }
-
-    /**
-     * get master servers
-     *
-     * @return master server information
-     */
-    public static List<Server> getMasterServers() {
-        return registryClient.getServerList(NodeType.MASTER);
-    }
-
-    /**
-     * master construct is the same with worker, use the master instead
-     *
-     * @return worker server informations
-     */
-    public static List<Server> getWorkerServers() {
-        return registryClient.getServerList(NodeType.WORKER);
-    }
-
-    public static Map<String, String> getServerMaps(NodeType nodeType, boolean hostOnly) {
-        return registryClient.getServerMaps(nodeType, hostOnly);
-    }
-
-    public static List<String> getServerNodeList(NodeType nodeType, boolean hostOnly) {
-        return registryClient.getServerNodeList(nodeType, hostOnly);
-    }
-
-    public static boolean isNodeExisted(String key) {
-        return registryClient.isExisted(key);
-    }
-
-    public static List<String> getChildrenNodes(final String key) {
-        return registryClient.getChildrenKeys(key);
-    }
-
-    public static String getNodeData(String key) {
-        return registryClient.get(key);
-    }
-}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java
index 41a1cc3..dcc9da3 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java
@@ -20,11 +20,9 @@ package org.apache.dolphinscheduler.api.controller;
 import org.apache.dolphinscheduler.api.ApiApplicationServer;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.SessionService;
-import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -36,11 +34,6 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
@@ -51,11 +44,8 @@ import org.springframework.web.context.WebApplicationContext;
 /**
  * abstract controller test
  */
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
+@RunWith(SpringRunner.class)
 @SpringBootTest(classes = ApiApplicationServer.class)
-@PrepareForTest({ RegistryCenterUtils.class, RegistryClient.class })
-@PowerMockIgnore({"javax.management.*"})
 public class AbstractControllerTest {
 
     public static final String SESSION_ID = "sessionId";
@@ -74,9 +64,6 @@ public class AbstractControllerTest {
 
     @Before
     public void setUp() {
-        PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
-        PowerMockito.mockStatic(RegistryCenterUtils.class);
-
         mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
 
         createSession();
@@ -98,7 +85,7 @@ public class AbstractControllerTest {
         String session = sessionService.createSession(loginUser, "127.0.0.1");
         sessionId = session;
 
-        Assert.assertTrue(!StringUtils.isEmpty(session));
+        Assert.assertFalse(StringUtils.isEmpty(session));
     }
 
     public Map<String, Object> success() {
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
index 62022d8..a0a1772 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
@@ -48,9 +48,6 @@ import org.springframework.test.web.servlet.MvcResult;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
-/**
- * scheduler controller test
- */
 public class SchedulerControllerTest extends AbstractControllerTest {
 
     private static Logger logger = LoggerFactory.getLogger(SchedulerControllerTest.class);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java
index 486707c..a61e7d3 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java
@@ -50,9 +50,6 @@ import org.springframework.test.web.servlet.MvcResult;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
-/**
- * task instance controller test
- */
 public class TaskInstanceControllerTest extends AbstractControllerTest {
 
     @InjectMocks
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java
index 14cd52f..7f675a5 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java
@@ -37,9 +37,6 @@ import org.springframework.test.web.servlet.MvcResult;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
-/**
- * tenant controller test
- */
 public class TenantControllerTest extends AbstractControllerTest {
 
     private static Logger logger = LoggerFactory.getLogger(TenantControllerTest.class);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
index f6e79bf..873236f 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
@@ -22,7 +22,6 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilder
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
-import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
@@ -30,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -59,12 +59,15 @@ public class WorkerGroupControllerTest extends AbstractControllerTest {
     @MockBean
     private ProcessInstanceMapper processInstanceMapper;
 
+    @MockBean
+    private RegistryClient registryClient;
+
     @Test
     public void testSaveWorkerGroup() throws Exception {
         Map<String, String> serverMaps = new HashMap<>();
         serverMaps.put("192.168.0.1", "192.168.0.1");
         serverMaps.put("192.168.0.2", "192.168.0.2");
-        PowerMockito.when(RegistryCenterUtils.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps);
+        PowerMockito.when(registryClient.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps);
 
         MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
         paramsMap.add("name","cxc_work_group");
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index 0c76835..1dfcc64 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import org.apache.dolphinscheduler.api.ApiApplicationServer;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl;
 import org.apache.dolphinscheduler.common.Constants;
@@ -33,105 +34,32 @@ import java.util.List;
 import java.util.Map;
 
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
-/**
- * worker group service test
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ RegistryClient.class })
-@PowerMockIgnore({"javax.management.*"})
+@RunWith(SpringJUnit4ClassRunner.class)
+@SpringBootTest(classes = ApiApplicationServer.class)
 public class WorkerGroupServiceTest {
 
+    @MockBean
+    private RegistryClient registryClient;
 
-    @InjectMocks
+    @Autowired
     private WorkerGroupServiceImpl workerGroupService;
 
-    @Mock
+    @MockBean
     private WorkerGroupMapper workerGroupMapper;
 
-    @Mock
+    @MockBean
     private ProcessInstanceMapper processInstanceMapper;
 
-
     private String groupName = "groupName000001";
 
-    /*    @Before
-    public void init() {
-        ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
-        zookeeperConfig.setDsRoot("/dolphinscheduler_qzw");
-        Mockito.when(zookeeperCachedOperator.getZookeeperConfig()).thenReturn(zookeeperConfig);
-
-        String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
-
-        List<String> workerGroupStrList = new ArrayList<>();
-        workerGroupStrList.add("default");
-        workerGroupStrList.add("test");
-        Mockito.when(zookeeperCachedOperator.getChildrenNodes(workerPath)).thenReturn(workerGroupStrList);
-
-        List<String> defaultAddressList = new ArrayList<>();
-        defaultAddressList.add("192.168.220.188:1234");
-        defaultAddressList.add("192.168.220.189:1234");
-
-        Mockito.when(zookeeperCachedOperator.getChildrenNodes(workerPath + "/default")).thenReturn(defaultAddressList);
-
-        Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultAddressList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238");
-    }
-
-*//**
-     *  create or update a worker group
-     *//*
-    @Test
-    public void testSaveWorkerGroup() {
-        // worker server maps
-        Map<String, String> serverMaps = new HashMap<>();
-        serverMaps.put("127.0.0.1:1234", "0.3,0.07,4.4,7.42,16.0,0.3,2021-03-19 20:17:58,2021-03-19 20:25:29,0,79214");
-        Mockito.when(zookeeperMonitor.getServerMaps(ZKNodeType.WORKER, true)).thenReturn(serverMaps);
-
-        User user = new User();
-        // general user add
-        user.setUserType(UserType.GENERAL_USER);
-        Map<String, Object> result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234");
-        Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(), result.get(Constants.MSG));
-
-        // success
-        user.setUserType(UserType.ADMIN_USER);
-        result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234");
-        Assert.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG));
-        // group name exist
-        Mockito.when(workerGroupMapper.selectById(2)).thenReturn(getWorkerGroup(2));
-        Mockito.when(workerGroupMapper.queryWorkerGroupByName(groupName)).thenReturn(getList());
-        result = workerGroupService.saveWorkerGroup(user, 2, groupName, "127.0.0.1:1234");
-        Assert.assertEquals(Status.NAME_EXIST, result.get(Constants.STATUS));
-    }*/
-
-    /**
-     * query worker group paging
-     */
-    /* @Test
-    public void testQueryAllGroupPaging() {
-        User user = new User();
-        // general user add
-        user.setUserType(UserType.ADMIN_USER);
-        Map<String, Object> result = workerGroupService.queryAllGroupPaging(user, 1, 10, null);
-        PageInfo<WorkerGroup> pageInfo = (PageInfo) result.get(Constants.DATA_LIST);
-        Assert.assertEquals(pageInfo.getLists().size(), 1);
-    }*/
-
-    @Before
-    public void before() {
-        PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
-    }
-
     @Test
     public void testQueryAllGroup() {
         Map<String, Object> result = workerGroupService.queryAllGroup();
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtilsTest.java
deleted file mode 100644
index b8b4945..0000000
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtilsTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.utils;
-
-import org.apache.dolphinscheduler.common.model.Server;
-
-import java.util.List;
-
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * zookeeper monitor utils test
- */
-@Ignore
-public class RegistryCenterUtilsTest {
-
-    @Test
-    public void testGetMasterList(){
-        List<Server> masterServerList = RegistryCenterUtils.getMasterServers();
-        List<Server> workerServerList = RegistryCenterUtils.getWorkerServers();
-
-        Assert.assertTrue(masterServerList.size() >= 0);
-        Assert.assertTrue(workerServerList.size() >= 0);
-    }
-
-}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index fa9a490..410070e 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -90,8 +90,6 @@ public final class Constants {
     public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
     public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers";
     public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters";
-    public static final String REGISTRY_PLUGIN_BINDING = "registry.plugin.binding";
-    public static final String REGISTRY_PLUGIN_DIR = "registry.plugin.dir";
     public static final String REGISTRY_SERVERS = "registry.servers";
 
     /**
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java
index acc3c02..cb247cf 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java
@@ -16,15 +16,6 @@
  */
 package org.apache.dolphinscheduler.common.enums;
 
-/**
- * zk node type
- */
 public enum NodeType {
-
-    /**
-     * 0 master node;
-     * 1 worker node;
-     * 2 dead_server node;
-     */
-    MASTER, WORKER, DEAD_SERVER;
+    MASTER, WORKER, DEAD_SERVER
 }
diff --git a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
index 9ecd9ef..d7119d0 100644
--- a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
+++ b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
@@ -75,11 +75,6 @@
             <unpack/>
         </artifact>
     </artifactSet>
-    <artifactSet to="lib/plugin/registry/zookeeper">
-        <artifact id="${project.groupId}:dolphinscheduler-registry-zookeeper:zip:${project.version}">
-            <unpack/>
-        </artifact>
-    </artifactSet>
     <!-- Task Plugins -->
     <artifactSet to="lib/plugin/task/datax">
         <artifact id="${project.groupId}:dolphinscheduler-task-datax:zip:${project.version}">
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java
deleted file mode 100644
index cda98ef..0000000
--- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.registry.zookeeper;
-
-import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
-import org.apache.dolphinscheduler.spi.register.RegistryConnectState;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZookeeperConnectionStateListener implements ConnectionStateListener {
-
-    private static final Logger logger = LoggerFactory.getLogger(ZookeeperConnectionStateListener.class);
-
-    private RegistryConnectListener registryConnectListener;
-
-    public ZookeeperConnectionStateListener(RegistryConnectListener registryConnectListener) {
-        this.registryConnectListener = registryConnectListener;
-    }
-
-    @Override
-    public void stateChanged(CuratorFramework client, ConnectionState newState) {
-
-        if (newState == ConnectionState.LOST) {
-            logger.error("connection lost from zookeeper");
-            registryConnectListener.notify(RegistryConnectState.LOST);
-        } else if (newState == ConnectionState.RECONNECTED) {
-            logger.info("reconnected to zookeeper");
-            registryConnectListener.notify(RegistryConnectState.RECONNECTED);
-        } else if (newState == ConnectionState.SUSPENDED) {
-            logger.warn("zookeeper connection SUSPENDED");
-            registryConnectListener.notify(RegistryConnectState.SUSPENDED);
-        }
-
-    }
-
-}
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryPlugin.java b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryPlugin.java
deleted file mode 100644
index 85723ad..0000000
--- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryPlugin.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.registry.zookeeper;
-
-import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
-import org.apache.dolphinscheduler.spi.register.RegistryFactory;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * zookeeper registry plugin
- */
-public class ZookeeperRegistryPlugin implements DolphinSchedulerPlugin {
-
-    @Override
-    public Iterable<RegistryFactory> getRegisterFactorys() {
-        return ImmutableList.of(new ZookeeperRegistryFactory());
-    }
-}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-api/pom.xml
new file mode 100644
index 0000000..1032710
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to Apache Software Foundation (ASF) under one or more contributor
+  ~ license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright
+  ~ ownership. Apache Software Foundation (ASF) licenses this file to you under
+  ~ the Apache License, Version 2.0 (the "License"); you may
+  ~ not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dolphinscheduler-registry</artifactId>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dolphinscheduler-registry-api</artifactId>
+</project>
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionListener.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionListener.java
new file mode 100644
index 0000000..eaebc81
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+@FunctionalInterface
+public interface ConnectionListener {
+    void onUpdate(ConnectionState newState);
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionState.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionState.java
new file mode 100644
index 0000000..fef3bca
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+public enum ConnectionState {
+    CONNECTED,
+    RECONNECTED,
+    SUSPENDED,
+    DISCONNECTED
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java
new file mode 100644
index 0000000..be781db
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+@Getter
+@Setter
+@Builder
+@ToString
+@NoArgsConstructor
+@AllArgsConstructor
+@Accessors(fluent = true)
+public class Event {
+    private String key;
+    private String path;
+    private String data;
+    private Type type;
+
+    public enum Type {
+        ADD,
+        REMOVE,
+        UPDATE
+    }
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
new file mode 100644
index 0000000..6057a7e
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Map;
+
+public interface Registry extends Closeable {
+    void start(Map<String, String> config);
+
+    boolean subscribe(String path, SubscribeListener listener);
+
+    void unsubscribe(String path);
+
+    void addConnectionStateListener(ConnectionListener listener);
+
+    String get(String key);
+
+    void put(String key, String value, boolean deleteOnDisconnect);
+
+    void delete(String key);
+
+    Collection<String> children(String key);
+
+    boolean exists(String key);
+
+    boolean acquireLock(String key);
+
+    boolean releaseLock(String key);
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryException.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryException.java
new file mode 100644
index 0000000..b88fe25
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+public final class RegistryException extends RuntimeException {
+
+    public RegistryException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RegistryException(String message) {
+        super(message);
+    }
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactory.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactory.java
new file mode 100644
index 0000000..6903e95
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+public interface RegistryFactory {
+    String name();
+
+    Registry create();
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactoryLoader.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactoryLoader.java
new file mode 100644
index 0000000..45d4596
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactoryLoader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.function.Function;
+import java.util.stream.StreamSupport;
+
+public final class RegistryFactoryLoader {
+    public static Map<String, RegistryFactory> load() {
+        final ServiceLoader<RegistryFactory> factories = ServiceLoader.load(RegistryFactory.class);
+        return StreamSupport.stream(factories.spliterator(), false)
+                            .collect(toMap(RegistryFactory::name, Function.identity()));
+    }
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java
new file mode 100644
index 0000000..2432eb1
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+public interface SubscribeListener {
+    void notify(Event event);
+}
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/pom.xml
similarity index 88%
rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml
rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/pom.xml
index 5ad7ee9..42690b9 100644
--- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/pom.xml
@@ -19,18 +19,19 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>dolphinscheduler-registry-plugin</artifactId>
+        <artifactId>dolphinscheduler-registry-plugins</artifactId>
         <groupId>org.apache.dolphinscheduler</groupId>
         <version>2.0.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-
     <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
-    <!-- can be load as a Alert Plugin when development and run server in IDE -->
-    <packaging>dolphinscheduler-plugin</packaging>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-registry-api</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.zookeeper</groupId>
@@ -57,7 +58,6 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
 
-
         <dependency>
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-test</artifactId>
@@ -76,11 +76,5 @@
             <classifier>runtime</classifier>
             <scope>test</scope>
         </dependency>
-
     </dependencies>
-
-    <build>
-        <finalName>dolphinscheduler-registry-zookeeper-${project.version}</finalName>
-    </build>
-
-</project>
\ No newline at end of file
+</project>
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConfiguration.java
similarity index 100%
rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConfiguration.java
rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConfiguration.java
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java
new file mode 100644
index 0000000..9526e0e
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.zookeeper;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionStateListener;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@RequiredArgsConstructor
+public final class ZookeeperConnectionStateListener implements ConnectionStateListener {
+    private final ConnectionListener listener;
+
+    @Override
+    public void stateChanged(CuratorFramework client,
+                             org.apache.curator.framework.state.ConnectionState newState) {
+        switch (newState) {
+            case LOST:
+                log.warn("Registry disconnected");
+                listener.onUpdate(ConnectionState.DISCONNECTED);
+                break;
+            case RECONNECTED:
+                log.info("Registry reconnected");
+                listener.onUpdate(ConnectionState.RECONNECTED);
+                break;
+            case SUSPENDED:
+                log.warn("Registry suspended");
+                listener.onUpdate(ConnectionState.SUSPENDED);
+                break;
+            default:
+                break;
+        }
+    }
+}
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
similarity index 53%
rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
index e84666a..89cb280 100644
--- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
@@ -28,21 +28,19 @@ import static org.apache.dolphinscheduler.plugin.registry.zookeeper.ZookeeperCon
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
-import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
-import org.apache.dolphinscheduler.spi.register.ListenerManager;
-import org.apache.dolphinscheduler.spi.register.Registry;
-import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
-import org.apache.dolphinscheduler.spi.register.RegistryException;
-import org.apache.dolphinscheduler.spi.register.SubscribeListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
 
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
-import org.apache.curator.framework.api.transaction.TransactionOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
@@ -56,28 +54,18 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.base.Strings;
 
-public class ZookeeperRegistry implements Registry {
+public final class ZookeeperRegistry implements Registry {
 
     private CuratorFramework client;
 
-    /**
-     * treeCache map
-     * k-subscribe key
-     * v-listener
-     */
-    private Map<String, TreeCache> treeCacheMap = new HashMap<>();
-
-    /**
-     * Distributed lock map
-     */
-    private ThreadLocal<Map<String, InterProcessMutex>> threadLocalLockMap = new ThreadLocal<>();
-
-    /**
-     * build retry policy
-     */
+    private final Map<String, TreeCache> treeCacheMap = new ConcurrentHashMap<>();
+
+    private static final ThreadLocal<Map<String, InterProcessMutex>> threadLocalLockMap = new ThreadLocal<>();
+
     private static RetryPolicy buildRetryPolicy(Map<String, String> registerData) {
         int baseSleepTimeMs = BASE_SLEEP_TIME.getParameterValue(registerData.get(BASE_SLEEP_TIME.getName()));
         int maxRetries = MAX_RETRIES.getParameterValue(registerData.get(MAX_RETRIES.getName()));
@@ -85,35 +73,32 @@ public class ZookeeperRegistry implements Registry {
         return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries, maxSleepMs);
     }
 
-    /**
-     * build digest
-     */
     private static void buildDigest(CuratorFrameworkFactory.Builder builder, String digest) {
         builder.authorization(DIGEST.getName(), digest.getBytes(StandardCharsets.UTF_8))
-                .aclProvider(new ACLProvider() {
-                    @Override
-                    public List<ACL> getDefaultAcl() {
-                        return ZooDefs.Ids.CREATOR_ALL_ACL;
-                    }
-
-                    @Override
-                    public List<ACL> getAclForPath(final String path) {
-                        return ZooDefs.Ids.CREATOR_ALL_ACL;
-                    }
-                });
+               .aclProvider(new ACLProvider() {
+                   @Override
+                   public List<ACL> getDefaultAcl() {
+                       return ZooDefs.Ids.CREATOR_ALL_ACL;
+                   }
+
+                   @Override
+                   public List<ACL> getAclForPath(final String path) {
+                       return ZooDefs.Ids.CREATOR_ALL_ACL;
+                   }
+               });
     }
 
     @Override
-    public void init(Map<String, String> registerData) {
-
-        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
-                .connectString(SERVERS.getParameterValue(registerData.get(SERVERS.getName())))
-                .retryPolicy(buildRetryPolicy(registerData))
-                .namespace(NAME_SPACE.getParameterValue(registerData.get(NAME_SPACE.getName())))
-                .sessionTimeoutMs(SESSION_TIMEOUT_MS.getParameterValue(registerData.get(SESSION_TIMEOUT_MS.getName())))
-                .connectionTimeoutMs(CONNECTION_TIMEOUT_MS.getParameterValue(registerData.get(CONNECTION_TIMEOUT_MS.getName())));
-
-        String digest = DIGEST.getParameterValue(registerData.get(DIGEST.getName()));
+    public void start(Map<String, String> config) {
+        CuratorFrameworkFactory.Builder builder =
+            CuratorFrameworkFactory.builder()
+                                   .connectString(SERVERS.getParameterValue(config.get(SERVERS.getName())))
+                                   .retryPolicy(buildRetryPolicy(config))
+                                   .namespace(NAME_SPACE.getParameterValue(config.get(NAME_SPACE.getName())))
+                                   .sessionTimeoutMs(SESSION_TIMEOUT_MS.getParameterValue(config.get(SESSION_TIMEOUT_MS.getName())))
+                                   .connectionTimeoutMs(CONNECTION_TIMEOUT_MS.getParameterValue(config.get(CONNECTION_TIMEOUT_MS.getName())));
+
+        String digest = DIGEST.getParameterValue(config.get(DIGEST.getName()));
         if (!Strings.isNullOrEmpty(digest)) {
             buildDigest(builder, digest);
         }
@@ -121,7 +106,7 @@ public class ZookeeperRegistry implements Registry {
 
         client.start();
         try {
-            if (!client.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT_MS.getParameterValue(registerData.get(BLOCK_UNTIL_CONNECTED_WAIT_MS.getName())), MILLISECONDS)) {
+            if (!client.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT_MS.getParameterValue(config.get(BLOCK_UNTIL_CONNECTED_WAIT_MS.getName())), MILLISECONDS)) {
                 client.close();
                 throw new RegistryException("zookeeper connect timeout");
             }
@@ -132,55 +117,26 @@ public class ZookeeperRegistry implements Registry {
     }
 
     @Override
-    public void addConnectionStateListener(RegistryConnectListener registryConnectListener) {
-        client.getConnectionStateListenable().addListener(new ZookeeperConnectionStateListener(registryConnectListener));
+    public void addConnectionStateListener(ConnectionListener listener) {
+        client.getConnectionStateListenable().addListener(new ZookeeperConnectionStateListener(listener));
     }
 
     @Override
-    public boolean subscribe(String path, SubscribeListener subscribeListener) {
-        if (null != treeCacheMap.get(path)) {
-            return false;
-        }
-        TreeCache treeCache = new TreeCache(client, path);
-        TreeCacheListener treeCacheListener = (client, event) -> {
-            TreeCacheEvent.Type type = event.getType();
-            DataChangeEvent eventType = null;
-            String dataPath = null;
-            switch (type) {
-                case NODE_ADDED:
-                    dataPath = event.getData().getPath();
-                    eventType = DataChangeEvent.ADD;
-                    break;
-                case NODE_UPDATED:
-                    eventType = DataChangeEvent.UPDATE;
-                    dataPath = event.getData().getPath();
-                    break;
-                case NODE_REMOVED:
-                    eventType = DataChangeEvent.REMOVE;
-                    dataPath = event.getData().getPath();
-                    break;
-                default:
-            }
-            if (null != eventType && null != dataPath) {
-                ListenerManager.dataChange(path, dataPath, new String(event.getData().getData()), eventType);
-            }
-        };
-        treeCache.getListenable().addListener(treeCacheListener);
-        treeCacheMap.put(path, treeCache);
+    public boolean subscribe(String path, SubscribeListener listener) {
+        final TreeCache treeCache = treeCacheMap.computeIfAbsent(path, $ -> new TreeCache(client, path));
+        treeCache.getListenable().addListener(($, event) -> listener.notify(new EventAdaptor(event, path)));
         try {
             treeCache.start();
         } catch (Exception e) {
-            throw new RegistryException("start zookeeper tree cache error", e);
+            treeCacheMap.remove(path);
+            throw new RegistryException("Failed to subscribe listener for key: " + path, e);
         }
-        ListenerManager.addListener(path, subscribeListener);
         return true;
     }
 
     @Override
     public void unsubscribe(String path) {
-        TreeCache treeCache = treeCacheMap.get(path);
-        treeCache.close();
-        ListenerManager.removeListener(path);
+        CloseableUtils.closeQuietly(treeCacheMap.get(path));
     }
 
     @Override
@@ -193,12 +149,7 @@ public class ZookeeperRegistry implements Registry {
     }
 
     @Override
-    public void remove(String key) {
-        delete(key);
-    }
-
-    @Override
-    public boolean isExisted(String key) {
+    public boolean exists(String key) {
         try {
             return null != client.checkExists().forPath(key);
         } catch (Exception e) {
@@ -207,47 +158,22 @@ public class ZookeeperRegistry implements Registry {
     }
 
     @Override
-    public void persist(String key, String value) {
-        try {
-            if (isExisted(key)) {
-                update(key, value);
-                return;
-            }
-            client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
+    public void put(String key, String value, boolean deleteOnDisconnect) {
+        final CreateMode mode = deleteOnDisconnect ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
 
-        } catch (Exception e) {
-            throw new RegistryException("zookeeper persist error", e);
-        }
-    }
-
-    @Override
-    public void persistEphemeral(String key, String value) {
         try {
-            if (isExisted(key)) {
-                update(key, value);
-                return;
-            }
-            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
+            client.create()
+                  .orSetData()
+                  .creatingParentsIfNeeded()
+                  .withMode(mode)
+                  .forPath(key, value.getBytes(StandardCharsets.UTF_8));
         } catch (Exception e) {
-            throw new RegistryException("zookeeper persist ephemeral error", e);
+            throw new RegistryException("Failed to put registry key: " + key, e);
         }
     }
 
     @Override
-    public void update(String key, String value) {
-        try {
-            if (!isExisted(key)) {
-                return;
-            }
-            TransactionOp transactionOp = client.transactionOp();
-            client.transaction().forOperations(transactionOp.check().forPath(key), transactionOp.setData().forPath(key, value.getBytes(StandardCharsets.UTF_8)));
-        } catch (Exception e) {
-            throw new RegistryException("zookeeper update error", e);
-        }
-    }
-
-    @Override
-    public List<String> getChildren(String key) {
+    public List<String> children(String key) {
         try {
             List<String> result = client.getChildren().forPath(key);
             result.sort(Comparator.reverseOrder());
@@ -258,23 +184,20 @@ public class ZookeeperRegistry implements Registry {
     }
 
     @Override
-    public boolean delete(String nodePath) {
+    public void delete(String nodePath) {
         try {
             client.delete()
-                    .deletingChildrenIfNeeded()
-                    .forPath(nodePath);
-        } catch (KeeperException.NoNodeException ignore) {
-            // the node is not exist, we can believe the node has been removed
-
+                  .deletingChildrenIfNeeded()
+                  .forPath(nodePath);
+        } catch (KeeperException.NoNodeException ignored) {
+            // Is already deleted or does not exist
         } catch (Exception e) {
-            throw new RegistryException("zookeeper delete key error", e);
+            throw new RegistryException("Failed to delete registry key: " + nodePath, e);
         }
-        return true;
     }
 
     @Override
     public boolean acquireLock(String key) {
-
         InterProcessMutex interProcessMutex = new InterProcessMutex(client, key);
         try {
             interProcessMutex.acquire();
@@ -291,7 +214,6 @@ public class ZookeeperRegistry implements Registry {
                 throw new RegistryException("zookeeper release lock error", e);
             }
         }
-
     }
 
     @Override
@@ -311,22 +233,35 @@ public class ZookeeperRegistry implements Registry {
         return true;
     }
 
-    public CuratorFramework getClient() {
-        return client;
-    }
-
     @Override
     public void close() {
-        treeCacheMap.forEach((key, value) -> value.close());
-        waitForCacheClose(500);
+        treeCacheMap.values().forEach(CloseableUtils::closeQuietly);
         CloseableUtils.closeQuietly(client);
     }
 
-    private void waitForCacheClose(long millis) {
-        try {
-            Thread.sleep(millis);
-        } catch (final InterruptedException ex) {
-            Thread.currentThread().interrupt();
+    static final class EventAdaptor extends Event {
+        public EventAdaptor(TreeCacheEvent event, String key) {
+            key(key);
+
+            switch (event.getType()) {
+                case NODE_ADDED:
+                    type(Type.ADD);
+                    break;
+                case NODE_UPDATED:
+                    type(Type.UPDATE);
+                    break;
+                case NODE_REMOVED:
+                    type(Type.REMOVE);
+                    break;
+                default:
+                    break;
+            }
+
+            final ChildData data = event.getData();
+            if (data != null) {
+                path(data.getPath());
+                data(new String(data.getData()));
+            }
         }
     }
 }
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java
similarity index 76%
rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java
rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java
index 1ecf3e0..949df21 100644
--- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java
@@ -17,16 +17,16 @@
 
 package org.apache.dolphinscheduler.plugin.registry.zookeeper;
 
-import org.apache.dolphinscheduler.spi.register.Registry;
-import org.apache.dolphinscheduler.spi.register.RegistryFactory;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.RegistryFactory;
 
-/**
- * Zookeeper registry factory
- */
-public class ZookeeperRegistryFactory implements RegistryFactory {
+import com.google.auto.service.AutoService;
+
+@AutoService(RegistryFactory.class)
+public final class ZookeeperRegistryFactory implements RegistryFactory {
 
     @Override
-    public String getName() {
+    public String name() {
         return "zookeeper";
     }
 
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
similarity index 87%
rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
index a5dc33b..8442c02 100644
--- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.dolphinscheduler.plugin.registry.zookeeper;
 
-import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
-import org.apache.dolphinscheduler.spi.register.SubscribeListener;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
 
 import org.apache.curator.test.TestingServer;
 
@@ -50,18 +50,18 @@ public class ZookeeperRegistryTest {
         server = new TestingServer(true);
         Map<String, String> registryConfig = new HashMap<>();
         registryConfig.put(ZookeeperConfiguration.SERVERS.getName(), server.getConnectString());
-        registry.init(registryConfig);
-        registry.persist("/sub", "");
+        registry.start(registryConfig);
+        registry.put("/sub", "", false);
     }
 
     @Test
     public void persistTest() {
-        registry.persist("/nodes/m1", "");
-        registry.persist("/nodes/m2", "");
-        Assert.assertEquals(Arrays.asList("m2", "m1"), registry.getChildren("/nodes"));
-        Assert.assertTrue(registry.isExisted("/nodes/m1"));
+        registry.put("/nodes/m1", "", false);
+        registry.put("/nodes/m2", "", false);
+        Assert.assertEquals(Arrays.asList("m2", "m1"), registry.children("/nodes"));
+        Assert.assertTrue(registry.exists("/nodes/m1"));
         registry.delete("/nodes/m2");
-        Assert.assertFalse(registry.isExisted("/nodes/m2"));
+        Assert.assertFalse(registry.exists("/nodes/m2"));
     }
 
     @Test
@@ -112,10 +112,9 @@ public class ZookeeperRegistryTest {
 
     }
 
-    class TestListener implements SubscribeListener {
-
+    static class TestListener implements SubscribeListener {
         @Override
-        public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
+        public void notify(Event event) {
             logger.info("I'm test listener");
         }
     }
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
new file mode 100644
index 0000000..4f3db56
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to Apache Software Foundation (ASF) under one or more contributor
+  ~ license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright
+  ~ ownership. Apache Software Foundation (ASF) licenses this file to you under
+  ~ the Apache License, Version 2.0 (the "License"); you may
+  ~ not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>dolphinscheduler-registry</artifactId>
+        <groupId>org.apache.dolphinscheduler</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>dolphinscheduler-registry-plugins</artifactId>
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>dolphinscheduler-registry-zookeeper</module>
+    </modules>
+</project>
diff --git a/dolphinscheduler-registry-plugin/pom.xml b/dolphinscheduler-registry/pom.xml
similarity index 74%
rename from dolphinscheduler-registry-plugin/pom.xml
rename to dolphinscheduler-registry/pom.xml
index 2f8bb0d..8ce7c24 100644
--- a/dolphinscheduler-registry-plugin/pom.xml
+++ b/dolphinscheduler-registry/pom.xml
@@ -24,20 +24,25 @@
         <version>2.0.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <groupId>org.apache.dolphinscheduler</groupId>
-    <artifactId>dolphinscheduler-registry-plugin</artifactId>
+    <artifactId>dolphinscheduler-registry</artifactId>
     <packaging>pom</packaging>
 
+    <modules>
+        <module>dolphinscheduler-registry-api</module>
+        <module>dolphinscheduler-registry-plugins</module>
+    </modules>
+
     <dependencies>
-        <!-- dolphinscheduler -->
         <dependency>
-            <groupId>org.apache.dolphinscheduler</groupId>
-            <artifactId>dolphinscheduler-spi</artifactId>
+            <groupId>com.google.auto.service</groupId>
+            <artifactId>auto-service</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.22</version>
             <scope>provided</scope>
         </dependency>
     </dependencies>
-
-    <modules>
-        <module>dolphinscheduler-registry-zookeeper</module>
-    </modules>
-</project>
\ No newline at end of file
+</project>
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 7ef9432..6c47f84 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -42,11 +43,10 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
-import org.apache.dolphinscheduler.spi.register.RegistryConnectState;
 
 import org.apache.commons.lang.StringUtils;
 
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.Executors;
@@ -79,6 +79,7 @@ public class MasterRegistryClient {
     @Autowired
     private ProcessService processService;
 
+    @Autowired
     private RegistryClient registryClient;
 
     /**
@@ -104,12 +105,11 @@ public class MasterRegistryClient {
 
     public void init() {
         this.startupTime = System.currentTimeMillis();
-        this.registryClient = RegistryClient.getInstance();
         this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
     }
 
     public void start() {
-        String nodeLock = registryClient.getMasterStartUpLockPath();
+        String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
         try {
             // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/startup-masters
 
@@ -117,7 +117,7 @@ public class MasterRegistryClient {
             // master registry
             registry();
             String registryPath = getMasterPath();
-            registryClient.handleDeadServer(registryPath, NodeType.MASTER, Constants.DELETE_OP);
+            registryClient.handleDeadServer(Collections.singleton(registryPath), NodeType.MASTER, Constants.DELETE_OP);
 
             // init system node
 
@@ -143,7 +143,8 @@ public class MasterRegistryClient {
     }
 
     public void closeRegistry() {
-        unRegistry();
+        // TODO unsubscribe MasterRegistryDataListener
+        deregister();
     }
 
     /**
@@ -167,7 +168,7 @@ public class MasterRegistryClient {
                     return;
                 }
                 // handle dead server
-                registryClient.handleDeadServer(path, nodeType, Constants.ADD_OP);
+                registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
             }
             //failover server
             if (failover) {
@@ -209,9 +210,9 @@ public class MasterRegistryClient {
     private String getFailoverLockPath(NodeType nodeType) {
         switch (nodeType) {
             case MASTER:
-                return registryClient.getMasterFailoverLockPath();
+                return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
             case WORKER:
-                return registryClient.getWorkerFailoverLockPath();
+                return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
             default:
                 return "";
         }
@@ -289,20 +290,20 @@ public class MasterRegistryClient {
 
             ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
             if (workerHost == null
-                    || !checkOwner
-                    || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
+                || !checkOwner
+                || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
                 // only failover the task owned myself if worker down.
                 if (processInstance == null) {
                     logger.error("failover error, the process {} of task {} do not exists.",
-                            taskInstance.getProcessInstanceId(), taskInstance.getId());
+                        taskInstance.getProcessInstanceId(), taskInstance.getId());
                     continue;
                 }
                 taskInstance.setProcessInstance(processInstance);
 
                 TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
-                        .buildTaskInstanceRelatedInfo(taskInstance)
-                        .buildProcessInstanceRelatedInfo(processInstance)
-                        .create();
+                                                                                       .buildTaskInstanceRelatedInfo(taskInstance)
+                                                                                       .buildProcessInstanceRelatedInfo(processInstance)
+                                                                                       .create();
                 // only kill yarn job if exists , the local thread has exited
                 ProcessUtils.killYarnJob(taskExecutionContext);
 
@@ -348,14 +349,6 @@ public class MasterRegistryClient {
         logger.info("master failover end");
     }
 
-    public void blockAcquireMutex() {
-        registryClient.getLock(registryClient.getMasterLockPath());
-    }
-
-    public void releaseLock() {
-        registryClient.releaseLock(registryClient.getMasterLockPath());
-    }
-
     /**
      * registry
      */
@@ -364,36 +357,24 @@ public class MasterRegistryClient {
         localNodePath = getMasterPath();
         int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-                masterConfig.getMasterMaxCpuloadAvg(),
-                masterConfig.getMasterReservedMemory(),
-                Sets.newHashSet(getMasterPath()),
-                Constants.MASTER_TYPE,
-                registryClient);
+            masterConfig.getMasterMaxCpuloadAvg(),
+            masterConfig.getMasterReservedMemory(),
+            Sets.newHashSet(getMasterPath()),
+            Constants.MASTER_TYPE,
+            registryClient);
 
         registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
-        registryClient.addConnectionStateListener(new MasterRegistryConnectStateListener());
+        registryClient.addConnectionStateListener(newState -> {
+            if (newState == ConnectionState.RECONNECTED || newState == ConnectionState.SUSPENDED) {
+                registryClient.persistEphemeral(localNodePath, "");
+            }
+        });
         this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
         logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
 
     }
 
-    class MasterRegistryConnectStateListener implements RegistryConnectListener {
-
-        @Override
-        public void notify(RegistryConnectState newState) {
-            if (RegistryConnectState.RECONNECTED == newState) {
-                registryClient.persistEphemeral(localNodePath, "");
-            }
-            if (RegistryConnectState.SUSPENDED == newState) {
-                registryClient.persistEphemeral(localNodePath, "");
-            }
-        }
-    }
-
-    /**
-     * remove registry info
-     */
-    public void unRegistry() {
+    public void deregister() {
         try {
             String address = getLocalAddress();
             String localNodePath = getMasterPath();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
index 4fd50a0..cb5b6be 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
@@ -22,42 +22,43 @@ import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHED
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
-import org.apache.dolphinscheduler.spi.register.SubscribeListener;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Strings;
+
 public class MasterRegistryDataListener implements SubscribeListener {
 
     private static final Logger logger = LoggerFactory.getLogger(MasterRegistryDataListener.class);
 
-    private MasterRegistryClient masterRegistryClient;
+    private final MasterRegistryClient masterRegistryClient;
 
     public MasterRegistryDataListener() {
         masterRegistryClient = SpringApplicationContext.getBean(MasterRegistryClient.class);
     }
 
     @Override
-    public void notify(String path, String data, DataChangeEvent event) {
+    public void notify(Event event) {
+        final String path = event.path();
+        if (Strings.isNullOrEmpty(path)) {
+            return;
+        }
         //monitor master
         if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) {
-            handleMasterEvent(event, path);
+            handleMasterEvent(event);
         } else if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
             //monitor worker
-            handleWorkerEvent(event, path);
+            handleWorkerEvent(event);
         }
     }
 
-    /**
-     * monitor master
-     *
-     * @param event event
-     * @param path path
-     */
-    public void handleMasterEvent(DataChangeEvent event, String path) {
-        switch (event) {
+    public void handleMasterEvent(Event event) {
+        final String path = event.path();
+        switch (event.type()) {
             case ADD:
                 logger.info("master node added : {}", path);
                 break;
@@ -69,14 +70,9 @@ public class MasterRegistryDataListener implements SubscribeListener {
         }
     }
 
-    /**
-     * monitor worker
-     *
-     * @param event event
-     * @param path path
-     */
-    public void handleWorkerEvent(DataChangeEvent event, String path) {
-        switch (event) {
+    public void handleWorkerEvent(Event event) {
+        final String path = event.path();
+        switch (event.type()) {
             case ADD:
                 logger.info("worker node added : {}", path);
                 break;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 1bceeb7..b7e904b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -27,16 +27,18 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.Event.Type;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
-import org.apache.dolphinscheduler.spi.register.SubscribeListener;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -101,10 +103,8 @@ public class ServerNodeManager implements InitializingBean {
      */
     private ScheduledExecutorService executorService;
 
-    /**
-     * zk client
-     */
-    private RegistryClient registryClient = RegistryClient.getInstance();
+    @Autowired
+    private RegistryClient registryClient;
 
     /**
      * eg : /node/worker/group/127.0.0.1:xxx
@@ -153,11 +153,11 @@ public class ServerNodeManager implements InitializingBean {
          */
         executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
         executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS);
-        /**
+        /*
          * init MasterNodeListener listener
          */
         registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
-        /**
+        /*
          * init WorkerNodeListener listener
          */
         registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener());
@@ -167,15 +167,15 @@ public class ServerNodeManager implements InitializingBean {
      * load nodes from zookeeper
      */
     public void load() {
-        /**
+        /*
          * master nodes from zookeeper
          */
         updateMasterNodes();
 
-        /**
+        /*
          * worker group nodes from zookeeper
          */
-        Set<String> workerGroups = registryClient.getWorkerGroupDirectly();
+        Collection<String> workerGroups = registryClient.getWorkerGroupDirectly();
         for (String workerGroup : workerGroups) {
             syncWorkerGroupNodes(workerGroup, registryClient.getWorkerGroupNodesDirectly(workerGroup));
         }
@@ -218,25 +218,28 @@ public class ServerNodeManager implements InitializingBean {
     class WorkerDataListener implements SubscribeListener {
 
         @Override
-        public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
+        public void notify(Event event) {
+            final String path = event.path();
+            final Type type = event.type();
+            final String data = event.data();
             if (registryClient.isWorkerPath(path)) {
                 try {
-                    if (dataChangeEvent == DataChangeEvent.ADD) {
+                    if (type == Type.ADD) {
                         logger.info("worker group node : {} added.", path);
                         String group = parseGroup(path);
-                        Set<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
+                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
                         logger.info("currentNodes : {}", currentNodes);
                         syncWorkerGroupNodes(group, currentNodes);
-                    } else if (dataChangeEvent == DataChangeEvent.REMOVE) {
+                    } else if (type == Type.REMOVE) {
                         logger.info("worker group node : {} down.", path);
                         String group = parseGroup(path);
-                        Set<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
+                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
                         syncWorkerGroupNodes(group, currentNodes);
                         alertDao.sendServerStopedAlert(1, path, "WORKER");
-                    } else if (dataChangeEvent == DataChangeEvent.UPDATE) {
+                    } else if (type == Type.UPDATE) {
                         logger.debug("worker group node : {} update, data: {}", path, data);
                         String group = parseGroup(path);
-                        Set<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
+                        Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
                         syncWorkerGroupNodes(group, currentNodes);
 
                         String node = parseNode(path);
@@ -268,19 +271,18 @@ public class ServerNodeManager implements InitializingBean {
         }
     }
 
-    /**
-     * master node listener
-     */
     class MasterDataListener implements SubscribeListener {
         @Override
-        public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
+        public void notify(Event event) {
+            final String path = event.path();
+            final Type type = event.type();
             if (registryClient.isMasterPath(path)) {
                 try {
-                    if (dataChangeEvent.equals(DataChangeEvent.ADD)) {
+                    if (type.equals(Type.ADD)) {
                         logger.info("master node : {} added.", path);
                         updateMasterNodes();
                     }
-                    if (dataChangeEvent.equals(DataChangeEvent.REMOVE)) {
+                    if (type.equals(Type.REMOVE)) {
                         logger.info("master node : {} down.", path);
                         updateMasterNodes();
                         alertDao.sendServerStopedAlert(1, path, "MASTER");
@@ -295,10 +297,10 @@ public class ServerNodeManager implements InitializingBean {
     private void updateMasterNodes() {
         SLOT_LIST.clear();
         this.masterNodes.clear();
-        String nodeLock = registryClient.getMasterLockPath();
+        String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
         try {
             registryClient.getLock(nodeLock);
-            Set<String> currentNodes = registryClient.getMasterNodesDirectly();
+            Collection<String> currentNodes = registryClient.getMasterNodesDirectly();
             List<Server> masterNodes = registryClient.getServerList(NodeType.MASTER);
             syncMasterNodes(currentNodes, masterNodes);
         } catch (Exception e) {
@@ -328,7 +330,7 @@ public class ServerNodeManager implements InitializingBean {
      *
      * @param nodes master nodes
      */
-    private void syncMasterNodes(Set<String> nodes, List<Server> masterNodes) {
+    private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
         masterLock.lock();
         try {
             this.masterNodes.addAll(nodes);
@@ -353,7 +355,7 @@ public class ServerNodeManager implements InitializingBean {
      * @param workerGroup worker group
      * @param nodes worker nodes
      */
-    private void syncWorkerGroupNodes(String workerGroup, Set<String> nodes) {
+    private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) {
         workerGroupLock.lock();
         try {
             workerGroup = workerGroup.toLowerCase();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java
index 34d6d9d..9e3bb0c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java
@@ -19,36 +19,26 @@ package org.apache.dolphinscheduler.server.monitor;
 
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 
-/**
- * zk monitor server impl
- */
 @Component
 public class RegistryMonitorImpl extends AbstractMonitor {
 
-    /**
-     * zookeeper operator
-     */
-    private RegistryClient registryClient = RegistryClient.getInstance();
-
-    /**
-     * get active nodes map by path
-     *
-     * @param path path
-     * @return active nodes map
-     */
+    @Autowired
+    private RegistryClient registryClient;
+
     @Override
     protected Map<String, String> getActiveNodesByPath(String path) {
 
         Map<String, String> maps = new HashMap<>();
 
-        List<String> childrenList = registryClient.getChildrenKeys(path);
+        Collection<String> childrenList = registryClient.getChildrenKeys(path);
 
         if (childrenList == null) {
             return maps;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index 61e8c40..67fd07a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -33,11 +33,11 @@ public class HeartBeatTask implements Runnable {
 
     private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
 
-    private Set<String> heartBeatPaths;
-    private RegistryClient registryClient;
+    private final Set<String> heartBeatPaths;
+    private final RegistryClient registryClient;
     private WorkerManagerThread workerManagerThread;
-    private String serverType;
-    private HeartBeat heartBeat;
+    private final String serverType;
+    private final HeartBeat heartBeat;
 
     public HeartBeatTask(long startupTime,
                          double maxCpuloadAvg,
@@ -89,7 +89,7 @@ public class HeartBeatTask implements Runnable {
             }
 
             for (String heartBeatPath : heartBeatPaths) {
-                registryClient.update(heartBeatPath, heartBeat.encodeHeartBeat());
+                registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
             }
         } catch (Throwable ex) {
             logger.error("error write heartbeat info", ex);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index fa186d0..96ec36b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -52,17 +52,11 @@ public class TaskCallbackService {
     private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();
 
     /**
-     * zookeeper registry center
-     */
-    private RegistryClient registryClient;
-
-    /**
      * netty remoting client
      */
     private final NettyRemotingClient nettyRemotingClient;
 
     public TaskCallbackService() {
-        this.registryClient = RegistryClient.getInstance();
         final NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
         this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index b59e3ec..54ec49c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.lang.StringUtils;
 
+import java.io.IOException;
 import java.util.Set;
 import java.util.StringJoiner;
 import java.util.concurrent.Executors;
@@ -73,6 +74,7 @@ public class WorkerRegistryClient {
      */
     private ScheduledExecutorService heartBeatExecutor;
 
+    @Autowired
     private RegistryClient registryClient;
 
     /**
@@ -86,7 +88,6 @@ public class WorkerRegistryClient {
     public void initWorkRegistry() {
         this.workerGroups = workerConfig.getWorkerGroups();
         this.startupTime = System.currentTimeMillis();
-        this.registryClient = RegistryClient.getInstance();
         this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
     }
 
@@ -121,7 +122,7 @@ public class WorkerRegistryClient {
     /**
      * remove registry info
      */
-    public void unRegistry() {
+    public void unRegistry() throws IOException {
         try {
             String address = getLocalAddress();
             Set<String> workerZkPaths = getWorkerZkPaths();
@@ -161,7 +162,7 @@ public class WorkerRegistryClient {
         return workerPaths;
     }
 
-    public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) throws Exception {
+    public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) {
         registryClient.handleDeadServer(nodeSet, nodeType, opType);
     }
 
diff --git a/dolphinscheduler-server/src/main/resources/config/install_config.conf b/dolphinscheduler-server/src/main/resources/config/install_config.conf
index 2e74b1e..96174d9 100755
--- a/dolphinscheduler-server/src/main/resources/config/install_config.conf
+++ b/dolphinscheduler-server/src/main/resources/config/install_config.conf
@@ -93,11 +93,6 @@ dbname="dolphinscheduler"
 # ---------------------------------------------------------
 # Registry Server
 # ---------------------------------------------------------
-# Registry Server plugin dir. DolphinScheduler will find and load the registry plugin jar package from this dir.
-# For now default registry server is zookeeper, so the default value is `lib/plugin/registry/zookeeper`.
-# If you want to implement your own registry server, please see https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/registry_spi.html
-registryPluginDir="lib/plugin/registry/zookeeper"
-
 # Registry Server plugin name, should be a substring of `registryPluginDir`, DolphinScheduler use this for verifying configuration consistency
 registryPluginName="zookeeper"
 
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
index 80f75af..619dba8 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
@@ -56,7 +56,7 @@ public class ExecutorDispatcherTest {
     }
 
     @Test
-    public void testDispatch() throws ExecuteException {
+    public void testDispatch() throws Exception {
         int port = 30000;
         final NettyServerConfig serverConfig = new NettyServerConfig();
         serverConfig.setListenPort(port);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index 073df65..97c45f1 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -20,10 +20,6 @@ package org.apache.dolphinscheduler.server.master.registry;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.doNothing;
 
-import java.util.Arrays;
-import java.util.Date;
-import java.util.concurrent.ScheduledExecutorService;
-
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
@@ -33,13 +29,17 @@ import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecC
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -59,6 +59,7 @@ public class MasterRegistryClientTest {
     @Mock
     private MasterConfig masterConfig;
 
+    @Mock
     private RegistryClient registryClient;
 
     @Mock
@@ -72,13 +73,10 @@ public class MasterRegistryClientTest {
 
     @Before
     public void before() throws Exception {
-        PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
-        registryClient = PowerMockito.mock(RegistryClient.class);
         given(registryClient.getLock(Mockito.anyString())).willReturn(true);
-        given(registryClient.getMasterFailoverLockPath()).willReturn("/path");
         given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
         given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080");
-        doNothing().when(registryClient).handleDeadServer(Mockito.anyString(), Mockito.any(NodeType.class), Mockito.anyString());
+        doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString());
         ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
 
         ProcessInstance processInstance = new ProcessInstance();
diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml
index bc8b367..753dc31 100644
--- a/dolphinscheduler-service/pom.xml
+++ b/dolphinscheduler-service/pom.xml
@@ -42,6 +42,10 @@
             <groupId>org.apache.dolphinscheduler</groupId>
             <artifactId>dolphinscheduler-spi</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.quartz-scheduler</groupId>
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java
deleted file mode 100644
index ba74f88..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.registry;
-
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-
-import org.apache.dolphinscheduler.common.IStoppable;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader;
-import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig;
-import org.apache.dolphinscheduler.spi.register.Registry;
-import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
-import org.apache.dolphinscheduler.spi.register.RegistryException;
-import org.apache.dolphinscheduler.spi.register.RegistryPluginManager;
-import org.apache.dolphinscheduler.spi.register.SubscribeListener;
-
-import org.apache.commons.lang.StringUtils;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * All business parties use this class to access the registry
- */
-public class RegistryCenter {
-
-    private static final Logger logger = LoggerFactory.getLogger(RegistryCenter.class);
-
-    private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
-    private Registry registry;
-
-    private IStoppable stoppable;
-
-    /**
-     * nodes namespace
-     */
-    protected static String NODES;
-
-    private RegistryPluginManager registryPluginManager;
-
-    protected static final String EMPTY = "";
-
-    private static final String REGISTRY_PREFIX = "registry";
-
-    private static final String REGISTRY_PLUGIN_BINDING = "registry.plugin.binding";
-
-    private static final String REGISTRY_PLUGIN_DIR = "registry.plugin.dir";
-
-    private static final String MAVEN_LOCAL_REPOSITORY = "maven.local.repository";
-
-    private static final String REGISTRY_PLUGIN_NAME = "plugin.name";
-
-    /**
-     * default registry plugin dir
-     */
-    private static final String REGISTRY_PLUGIN_PATH = "lib/plugin/registry";
-
-    private static final String REGISTRY_CONFIG_FILE_PATH = "/registry.properties";
-
-    /**
-     * init node persist
-     */
-    public void init() {
-        if (isStarted.compareAndSet(false, true)) {
-            PropertyUtils.loadPropertyFile(REGISTRY_CONFIG_FILE_PATH);
-            Map<String, String> registryConfig = PropertyUtils.getPropertiesByPrefix(REGISTRY_PREFIX);
-
-            if (null == registryConfig || registryConfig.isEmpty()) {
-                throw new RegistryException("registry config param is null");
-            }
-            if (null == registryPluginManager) {
-                installRegistryPlugin(registryConfig.get(REGISTRY_PLUGIN_NAME));
-                registry = registryPluginManager.getRegistry();
-            }
-
-            registry.init(registryConfig);
-            initNodes();
-
-        }
-    }
-
-    /**
-     * init nodes
-     */
-    private void initNodes() {
-        persist(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY);
-        persist(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY);
-        persist(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY);
-    }
-
-    /**
-     * install registry plugin
-     */
-    private void installRegistryPlugin(String registryPluginName) {
-        DolphinPluginManagerConfig registryPluginManagerConfig = new DolphinPluginManagerConfig();
-        registryPluginManagerConfig.setPlugins(PropertyUtils.getString(REGISTRY_PLUGIN_BINDING));
-        if (StringUtils.isNotBlank(PropertyUtils.getString(REGISTRY_PLUGIN_DIR))) {
-            registryPluginManagerConfig.setInstalledPluginsDir(PropertyUtils.getString(REGISTRY_PLUGIN_DIR, REGISTRY_PLUGIN_PATH).trim());
-        }
-
-        if (StringUtils.isNotBlank(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY))) {
-            registryPluginManagerConfig.setMavenLocalRepository(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY).trim());
-        }
-
-        registryPluginManager = new RegistryPluginManager(registryPluginName);
-
-        DolphinPluginLoader registryPluginLoader = new DolphinPluginLoader(registryPluginManagerConfig, ImmutableList.of(registryPluginManager));
-        try {
-            registryPluginLoader.loadPlugins();
-        } catch (Exception e) {
-            throw new RuntimeException("Load registry Plugin Failed !", e);
-        }
-    }
-
-    /**
-     * close
-     */
-    public void close() {
-        if (isStarted.compareAndSet(true, false) && registry != null) {
-            registry.close();
-        }
-    }
-
-    public void persist(String key, String value) {
-        registry.persist(key, value);
-    }
-
-    public void persistEphemeral(String key, String value) {
-        registry.persistEphemeral(key, value);
-    }
-
-    public void remove(String key) {
-        registry.remove(key);
-    }
-
-    public void update(String key, String value) {
-        registry.update(key, value);
-    }
-
-    public String get(String key) {
-        return registry.get(key);
-    }
-
-    public void subscribe(String path, SubscribeListener subscribeListener) {
-        registry.subscribe(path, subscribeListener);
-    }
-
-    public void addConnectionStateListener(RegistryConnectListener registryConnectListener) {
-        registry.addConnectionStateListener(registryConnectListener);
-    }
-
-    public boolean isExisted(String key) {
-        return registry.isExisted(key);
-    }
-
-    public boolean getLock(String key) {
-        return registry.acquireLock(key);
-    }
-
-    public boolean releaseLock(String key) {
-        return registry.releaseLock(key);
-    }
-
-    /**
-     * @return get dead server node parent path
-     */
-    public String getDeadZNodeParentPath() {
-        return REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
-    }
-
-    public void setStoppable(IStoppable stoppable) {
-        this.stoppable = stoppable;
-    }
-
-    public IStoppable getStoppable() {
-        return stoppable;
-    }
-
-    /**
-     * whether master path
-     *
-     * @param path path
-     * @return result
-     */
-    public boolean isMasterPath(String path) {
-        return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_MASTERS);
-    }
-
-    /**
-     * get worker group path
-     *
-     * @param workerGroup workerGroup
-     * @return worker group path
-     */
-    public String getWorkerGroupPath(String workerGroup) {
-        return REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup;
-    }
-
-    /**
-     * whether worker path
-     *
-     * @param path path
-     * @return result
-     */
-    public boolean isWorkerPath(String path) {
-        return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_WORKERS);
-    }
-
-    /**
-     * get children nodes
-     *
-     * @param key key
-     * @return children nodes
-     */
-    public List<String> getChildrenKeys(final String key) {
-        return registry.getChildren(key);
-    }
-
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index 210ce21..f384678 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -22,60 +22,72 @@ import static org.apache.dolphinscheduler.common.Constants.COLON;
 import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
 import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
 import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
 import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
 import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
 import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
 import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
 import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.HeartBeat;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-
-import org.apache.commons.lang.StringUtils;
-
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.RegistryFactory;
+import org.apache.dolphinscheduler.registry.api.RegistryFactoryLoader;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+
+import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
-/**
- * registry client singleton
- */
-public class RegistryClient extends RegistryCenter {
+import com.google.common.base.Strings;
 
+@Component
+public class RegistryClient {
     private static final Logger logger = LoggerFactory.getLogger(RegistryClient.class);
 
-    private static RegistryClient registryClient = new RegistryClient();
-
-    private RegistryClient() {
-        super.init();
-    }
-
-    public static RegistryClient getInstance() {
-        return registryClient;
+    private static final String EMPTY = "";
+    private static final String REGISTRY_PREFIX = "registry";
+    private static final String REGISTRY_PLUGIN_NAME = "plugin.name";
+    private static final String REGISTRY_CONFIG_FILE_PATH = "/registry.properties";
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+    private Registry registry;
+    private IStoppable stoppable;
+
+    @PostConstruct
+    public void afterConstruct() {
+        start();
+        initNodes();
     }
 
-    /**
-     * get active master num
-     *
-     * @return active master number
-     */
     public int getActiveMasterNum() {
-        List<String> childrenList = new ArrayList<>();
+        Collection<String> childrenList = new ArrayList<>();
         try {
             // read master node parent path from conf
-            if (isExisted(getNodeParentPath(NodeType.MASTER))) {
-                childrenList = getChildrenKeys(getNodeParentPath(NodeType.MASTER));
+            if (exists(rootNodePath(NodeType.MASTER))) {
+                childrenList = getChildrenKeys(rootNodePath(NodeType.MASTER));
             }
         } catch (Exception e) {
             logger.error("getActiveMasterNum error", e);
@@ -83,15 +95,9 @@ public class RegistryClient extends RegistryCenter {
         return childrenList.size();
     }
 
-    /**
-     * get server list.
-     *
-     * @param nodeType zookeeper node type
-     * @return server list
-     */
     public List<Server> getServerList(NodeType nodeType) {
-        Map<String, String> serverMaps = getServerMaps(nodeType);
-        String parentPath = getNodeParentPath(nodeType);
+        Map<String, String> serverMaps = getServerMaps(nodeType, false);
+        String parentPath = rootNodePath(nodeType);
 
         List<Server> serverList = new ArrayList<>();
         for (Map.Entry<String, String> entry : serverMaps.entrySet()) {
@@ -119,40 +125,11 @@ public class RegistryClient extends RegistryCenter {
         return serverList;
     }
 
-    /**
-     * get server nodes.
-     *
-     * @param nodeType registry node type
-     * @return result : list<node>
-     */
-    public List<String> getServerNodes(NodeType nodeType) {
-        String path = getNodeParentPath(nodeType);
-        List<String> serverList = getChildrenKeys(path);
-        if (nodeType == NodeType.WORKER) {
-            List<String> workerList = new ArrayList<>();
-            for (String group : serverList) {
-                List<String> groupServers = getChildrenKeys(path + SINGLE_SLASH + group);
-                for (String groupServer : groupServers) {
-                    workerList.add(group + SINGLE_SLASH + groupServer);
-                }
-            }
-            serverList = workerList;
-        }
-        return serverList;
-    }
-
-    /**
-     * get server list map.
-     *
-     * @param nodeType zookeeper node type
-     * @param hostOnly host only
-     * @return result : {host : resource info}
-     */
     public Map<String, String> getServerMaps(NodeType nodeType, boolean hostOnly) {
         Map<String, String> serverMap = new HashMap<>();
         try {
-            String path = getNodeParentPath(nodeType);
-            List<String> serverList = getServerNodes(nodeType);
+            String path = rootNodePath(nodeType);
+            Collection<String> serverList = getServerNodes(nodeType);
             for (String server : serverList) {
                 String host = server;
                 if (nodeType == NodeType.WORKER && hostOnly) {
@@ -167,298 +144,194 @@ public class RegistryClient extends RegistryCenter {
         return serverMap;
     }
 
-    /**
-     * get server list map.
-     *
-     * @param nodeType zookeeper node type
-     * @return result : {host : resource info}
-     */
-    public Map<String, String> getServerMaps(NodeType nodeType) {
-        return getServerMaps(nodeType, false);
+    public boolean checkNodeExists(String host, NodeType nodeType) {
+        return getServerMaps(nodeType, true).keySet()
+                                            .stream()
+                                            .anyMatch(it -> it.contains(host));
     }
 
-    /**
-     * get server node set.
-     *
-     * @param nodeType zookeeper node type
-     * @param hostOnly host only
-     * @return result : set<host>
-     */
-    public Set<String> getServerNodeSet(NodeType nodeType, boolean hostOnly) {
-        Set<String> serverSet = new HashSet<>();
-        try {
-            List<String> serverList = getServerNodes(nodeType);
-            for (String server : serverList) {
-                String host = server;
-                if (nodeType == NodeType.WORKER && hostOnly) {
-                    host = server.split(SINGLE_SLASH)[1];
-                }
-                serverSet.add(host);
+    public void handleDeadServer(Collection<String> nodes, NodeType nodeType, String opType) {
+        nodes.forEach(node -> {
+            final String host = getHostByEventDataPath(node);
+            final String type = nodeType == NodeType.MASTER ? MASTER_TYPE : WORKER_TYPE;
+
+            if (opType.equals(DELETE_OP)) {
+                removeDeadServerByHost(host, type);
+            } else if (opType.equals(ADD_OP)) {
+                String deadServerPath = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + type + UNDERLINE + host;
+                // Add dead server info to zk dead server path : /dead-servers/
+                registry.put(deadServerPath, type + UNDERLINE + host, false);
+                logger.info("{} server dead , and {} added to zk dead server path success", nodeType, node);
             }
-        } catch (Exception e) {
-            logger.error("get server node set failed", e);
-        }
-        return serverSet;
+        });
     }
 
-    /**
-     * get server node list.
-     *
-     * @param nodeType zookeeper node type
-     * @param hostOnly host only
-     * @return result : list<host>
-     */
-    public List<String> getServerNodeList(NodeType nodeType, boolean hostOnly) {
-        Set<String> serverSet = getServerNodeSet(nodeType, hostOnly);
-        List<String> serverList = new ArrayList<>(serverSet);
-        Collections.sort(serverList);
-        return serverList;
-    }
+    public boolean checkIsDeadServer(String node, String serverType) {
+        // ip_sequence_no
+        String[] zNodesPath = node.split("/");
+        String ipSeqNo = zNodesPath[zNodesPath.length - 1];
+        String deadServerPath = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverType + UNDERLINE + ipSeqNo;
 
-    /**
-     * check the zookeeper node already exists
-     *
-     * @param host host
-     * @param nodeType zookeeper node type
-     * @return true if exists
-     */
-    public boolean checkNodeExists(String host, NodeType nodeType) {
-        String path = getNodeParentPath(nodeType);
-        if (StringUtils.isEmpty(path)) {
-            logger.error("check zk node exists error, host:{}, zk node type:{}",
-                    host, nodeType);
-            return false;
-        }
-        Map<String, String> serverMaps = getServerMaps(nodeType, true);
-        for (String hostKey : serverMaps.keySet()) {
-            if (hostKey.contains(host)) {
-                return true;
-            }
-        }
-        return false;
+        return !exists(node) || exists(deadServerPath);
     }
 
-    /**
-     * @return get worker node parent path
-     */
-    protected String getWorkerNodeParentPath() {
-        return Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+    public Collection<String> getMasterNodesDirectly() {
+        return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
     }
 
-    /**
-     * @return get master node parent path
-     */
-    protected String getMasterNodeParentPath() {
-        return Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+    public Collection<String> getWorkerGroupDirectly() {
+        return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
     }
 
-    /**
-     * @return get dead server node parent path
-     */
-    protected String getDeadNodeParentPath() {
-        return Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
+    public Collection<String> getWorkerGroupNodesDirectly(String workerGroup) {
+        return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup);
     }
 
     /**
-     * @return get master lock path
+     * get host ip:port, path format: parentPath/ip:port
+     *
+     * @param path path
+     * @return host ip:port, string format: parentPath/ip:port
      */
-    public String getMasterLockPath() {
-        return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
+    public String getHostByEventDataPath(String path) {
+        checkArgument(!Strings.isNullOrEmpty(path), "path cannot be null or empty");
+
+        final String[] pathArray = path.split(SINGLE_SLASH);
+
+        checkArgument(pathArray.length >= 1, "cannot parse path: %s", path);
+
+        return pathArray[pathArray.length - 1];
     }
 
-    /**
-     * @param nodeType zookeeper node type
-     * @return get zookeeper node parent path
-     */
-    public String getNodeParentPath(NodeType nodeType) {
-        String path = "";
-        switch (nodeType) {
-            case MASTER:
-                return getMasterNodeParentPath();
-            case WORKER:
-                return getWorkerNodeParentPath();
-            case DEAD_SERVER:
-                return getDeadNodeParentPath();
-            default:
-                break;
+    public void close() throws IOException {
+        if (isStarted.compareAndSet(true, false) && registry != null) {
+            registry.close();
         }
-        return path;
     }
 
-    /**
-     * @return get master start up lock path
-     */
-    public String getMasterStartUpLockPath() {
-        return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
+    public void persistEphemeral(String key, String value) {
+        registry.put(key, value, true);
     }
 
-    /**
-     * @return get master failover lock path
-     */
-    public String getMasterFailoverLockPath() {
-        return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
+    public void remove(String key) {
+        registry.delete(key);
     }
 
-    /**
-     * @return get worker failover lock path
-     */
-    public String getWorkerFailoverLockPath() {
-        return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
+    public String get(String key) {
+        return registry.get(key);
     }
 
-    /**
-     * opType(add): if find dead server , then add to zk deadServerPath
-     * opType(delete): delete path from zk
-     *
-     * @param node node path
-     * @param nodeType master or worker
-     * @param opType delete or add
-     * @throws Exception errors
-     */
-    public void handleDeadServer(String node, NodeType nodeType, String opType) throws Exception {
-        String host = getHostByEventDataPath(node);
-        String type = (nodeType == NodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE;
-
-        //check server restart, if restart , dead server path in zk should be delete
-        if (opType.equals(DELETE_OP)) {
-            removeDeadServerByHost(host, type);
-
-        } else if (opType.equals(ADD_OP)) {
-            String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
-            if (!isExisted(deadServerPath)) {
-                //add dead server info to zk dead server path : /dead-servers/
+    public void subscribe(String path, SubscribeListener listener) {
+        registry.subscribe(path, listener);
+    }
 
-                persist(deadServerPath, (type + UNDERLINE + host));
+    public void addConnectionStateListener(ConnectionListener listener) {
+        registry.addConnectionStateListener(listener);
+    }
 
-                logger.info("{} server dead , and {} added to zk dead server path success",
-                        nodeType, node);
-            }
-        }
+    public boolean exists(String key) {
+        return registry.exists(key);
+    }
 
+    public boolean getLock(String key) {
+        return registry.acquireLock(key);
     }
 
-    /**
-     * check dead server or not , if dead, stop self
-     *
-     * @param node node path
-     * @param serverType master or worker prefix
-     * @return true if not exists
-     * @throws Exception errors
-     */
-    public boolean checkIsDeadServer(String node, String serverType) throws Exception {
-        // ip_sequence_no
-        String[] zNodesPath = node.split("\\/");
-        String ipSeqNo = zNodesPath[zNodesPath.length - 1];
-        String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + serverType + UNDERLINE + ipSeqNo;
+    public boolean releaseLock(String key) {
+        return registry.releaseLock(key);
+    }
 
-        return !isExisted(node) || isExisted(deadServerPath);
+    public void setStoppable(IStoppable stoppable) {
+        this.stoppable = stoppable;
     }
 
-    /**
-     * get master nodes directly
-     *
-     * @return master nodes
-     */
-    public Set<String> getMasterNodesDirectly() {
-        List<String> masters = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
-        return new HashSet<>(masters);
+    public IStoppable getStoppable() {
+        return stoppable;
     }
 
-    /**
-     * get worker nodes directly
-     *
-     * @return master nodes
-     */
-    public Set<String> getWorkerNodesDirectly() {
-        List<String> workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
-        return new HashSet<>(workers);
+    public boolean isMasterPath(String path) {
+        return path != null && path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS);
     }
 
-    /**
-     * get worker group directly
-     *
-     * @return worker group nodes
-     */
-    public Set<String> getWorkerGroupDirectly() {
-        List<String> workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
-        return new HashSet<>(workers);
+    public boolean isWorkerPath(String path) {
+        return path != null && path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS);
     }
 
-    /**
-     * get worker group nodes
-     */
-    public Set<String> getWorkerGroupNodesDirectly(String workerGroup) {
-        List<String> workers = getChildrenKeys(getWorkerGroupPath(workerGroup));
-        return new HashSet<>(workers);
+    public Collection<String> getChildrenKeys(final String key) {
+        return registry.children(key);
     }
 
-    /**
-     * opType(add): if find dead server , then add to zk deadServerPath
-     * opType(delete): delete path from zk
-     *
-     * @param nodeSet node path set
-     * @param nodeType master or worker
-     * @param opType delete or add
-     * @throws Exception errors
-     */
-    public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) throws Exception {
+    public Set<String> getServerNodeSet(NodeType nodeType, boolean hostOnly) {
+        try {
+            return getServerNodes(nodeType).stream().map(server -> {
+                if (nodeType == NodeType.WORKER && hostOnly) {
+                    return server.split(SINGLE_SLASH)[1];
+                }
+                return server;
+            }).collect(Collectors.toSet());
+        } catch (Exception e) {
+            throw new RegistryException("Failed to get server node: " + nodeType, e);
+        }
+    }
 
-        String type = (nodeType == NodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE;
-        for (String node : nodeSet) {
-            String host = getHostByEventDataPath(node);
-            //check server restart, if restart , dead server path in zk should be delete
-            if (opType.equals(DELETE_OP)) {
-                removeDeadServerByHost(host, type);
+    private void start() {
+        if (isStarted.compareAndSet(false, true)) {
+            PropertyUtils.loadPropertyFile(REGISTRY_CONFIG_FILE_PATH);
+            final Map<String, String> registryConfig = PropertyUtils.getPropertiesByPrefix(REGISTRY_PREFIX);
 
-            } else if (opType.equals(ADD_OP)) {
-                String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
-                if (!isExisted(deadServerPath)) {
-                    //add dead server info to zk dead server path : /dead-servers/
-                    persist(deadServerPath, (type + UNDERLINE + host));
-                    logger.info("{} server dead , and {} added to registry dead server path success",
-                            nodeType, node);
-                }
+            if (null == registryConfig || registryConfig.isEmpty()) {
+                throw new RegistryException("registry config param is null");
             }
-
+            final String pluginName = registryConfig.get(REGISTRY_PLUGIN_NAME);
+            final Map<String, RegistryFactory> factories = RegistryFactoryLoader.load();
+            if (!factories.containsKey(pluginName)) {
+                throw new RegistryException("No such registry plugin: " + pluginName);
+            }
+            registry = factories.get(pluginName).create();
+            registry.start(registryConfig);
         }
+    }
 
+    private void initNodes() {
+        registry.put(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY, false);
+        registry.put(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY, false);
+        registry.put(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY, false);
     }
 
-    /**
-     * get host ip:port, string format: parentPath/ip:port
-     *
-     * @param path path
-     * @return host ip:port, string format: parentPath/ip:port
-     */
-    public String getHostByEventDataPath(String path) {
-        if (StringUtils.isEmpty(path)) {
-            logger.error("empty path!");
-            return "";
-        }
-        String[] pathArray = path.split(SINGLE_SLASH);
-        if (pathArray.length < 1) {
-            logger.error("parse ip error: {}", path);
-            return "";
+    private String rootNodePath(NodeType type) {
+        switch (type) {
+            case MASTER:
+                return Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+            case WORKER:
+                return Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+            case DEAD_SERVER:
+                return Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
+            default:
+                throw new IllegalStateException("Should not reach here");
         }
-        return pathArray[pathArray.length - 1];
+    }
 
+    private Collection<String> getServerNodes(NodeType nodeType) {
+        final String path = rootNodePath(nodeType);
+        final Collection<String> serverList = getChildrenKeys(path);
+        if (nodeType != NodeType.WORKER) {
+            return serverList;
+        }
+        return serverList.stream().flatMap(group ->
+            getChildrenKeys(path + SINGLE_SLASH + group)
+                .stream()
+                .map(it -> group + SINGLE_SLASH + it)
+        ).collect(Collectors.toList());
     }
 
-    /**
-     * remove dead server by host
-     *
-     * @param host host
-     * @param serverType serverType
-     */
-    public void removeDeadServerByHost(String host, String serverType) {
-        List<String> deadServers = getChildrenKeys(getDeadZNodeParentPath());
+    private void removeDeadServerByHost(String host, String serverType) {
+        Collection<String> deadServers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS);
         for (String serverPath : deadServers) {
             if (serverPath.startsWith(serverType + UNDERLINE + host)) {
-                String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
+                String server = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverPath;
                 remove(server);
                 logger.info("{} server {} deleted from zk dead server path success", serverType, host);
             }
         }
     }
-
 }
diff --git a/dolphinscheduler-service/src/main/resources/registry.properties b/dolphinscheduler-service/src/main/resources/registry.properties
index b00dfc0..f534a65 100644
--- a/dolphinscheduler-service/src/main/resources/registry.properties
+++ b/dolphinscheduler-service/src/main/resources/registry.properties
@@ -15,18 +15,5 @@
 # limitations under the License.
 #
 
-#registry.plugin.dir config the Registry Plugin dir.
-registry.plugin.dir=lib/plugin/registry
-
 registry.plugin.name=zookeeper
 registry.servers=127.0.0.1:2181
-
-#maven.local.repository=/usr/local/localRepository
-
-#registry.plugin.binding config the Registry Plugin need be load when development and run in IDE
-#registry.plugin.binding=./dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml
-
-#registry timeout
-#registry.session.timeout.ms=30000
-#registry.connection.timeout.ms=7500
-#registry.block.until.connected.wait=600
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryClientTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryClientTest.java
deleted file mode 100644
index 2dc2193..0000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryClientTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.registry;
-
-import static org.apache.dolphinscheduler.common.Constants.ADD_OP;
-import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
-
-import static org.mockito.BDDMockito.given;
-
-import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.spi.register.Registry;
-
-import java.util.Arrays;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import com.google.common.collect.Sets;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ RegistryClient.class })
-public class RegistryClientTest {
-
-    private RegistryClient registryClient;
-
-    @Test
-    public void test() throws Exception {
-        Registry registry = PowerMockito.mock(Registry.class);
-        PowerMockito.doNothing().when(registry).persist(Mockito.anyString(), Mockito.anyString());
-        PowerMockito.doNothing().when(registry).update(Mockito.anyString(), Mockito.anyString());
-        PowerMockito.when(registry.releaseLock(Mockito.anyString())).thenReturn(true);
-        PowerMockito.when(registry.getChildren("/dead-servers")).thenReturn(Arrays.asList("worker_127.0.0.1:8089"));
-
-        PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
-        registryClient = PowerMockito.mock(RegistryClient.class);
-        registryClient.persist("/key", "");
-        registryClient.update("/key", "");
-        registryClient.releaseLock("/key");
-        registryClient.getChildrenKeys("/key");
-        registryClient.handleDeadServer(Sets.newHashSet("ma/127.0.0.1:8089"), NodeType.WORKER, DELETE_OP);
-        registryClient.handleDeadServer(Sets.newHashSet("ma/127.0.0.1:8089"), NodeType.WORKER, ADD_OP);
-        //registryClient.removeDeadServerByHost("127.0.0.1:8089","master");
-        registryClient.handleDeadServer("ma/127.0.0.1:8089", NodeType.WORKER, DELETE_OP);
-        registryClient.handleDeadServer("ma/127.0.0.1:8089", NodeType.WORKER, ADD_OP);
-        registryClient.checkIsDeadServer("master/127.0.0.1","master");
-        given(registry.getChildren("/nodes/worker")).willReturn(Arrays.asList("worker_127.0.0.1:8089"));
-        given(registry.getChildren("/nodes/worker/worker_127.0.0.1:8089")).willReturn(Arrays.asList("default"));
-
-        registryClient.checkNodeExists("127.0.0.1",NodeType.WORKER);
-
-        registryClient.getServerList(NodeType.MASTER);
-
-    }
-
-}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryPluginTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryPluginTest.java
deleted file mode 100644
index a35252c..0000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryPluginTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.registry;
-
-import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader;
-import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig;
-import org.apache.dolphinscheduler.spi.register.RegistryPluginManager;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-
-public class RegistryPluginTest {
-
-    @Test
-    public void testLoadPlugin() throws Exception {
-        DolphinPluginManagerConfig registryPluginManagerConfig = new DolphinPluginManagerConfig();
-        String path = DolphinPluginLoader.class.getClassLoader().getResource("").getPath();
-
-        String registryPluginZkPath = path + "../../../dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml";
-        registryPluginManagerConfig.setPlugins(registryPluginZkPath);
-        RegistryPluginManager registryPluginManager = new RegistryPluginManager("zookeeper");
-
-        DolphinPluginLoader registryPluginLoader = new DolphinPluginLoader(registryPluginManagerConfig, ImmutableList.of(registryPluginManager));
-        registryPluginLoader.loadPlugins();
-        Assert.assertNotNull(registryPluginManager.getRegistry());
-
-    }
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/DolphinSchedulerPlugin.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/DolphinSchedulerPlugin.java
index 1c5f1c5..a073c53 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/DolphinSchedulerPlugin.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/DolphinSchedulerPlugin.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.spi;
 import static java.util.Collections.emptyList;
 
 import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
-import org.apache.dolphinscheduler.spi.register.RegistryFactory;
 import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
 
 /**
@@ -43,14 +42,6 @@ public interface DolphinSchedulerPlugin {
     }
 
     /**
-     * get registry plugin factory
-     * @return registry factory
-     */
-    default Iterable<RegistryFactory> getRegisterFactorys() {
-        return emptyList();
-    }
-
-    /**
      * get task plugin factory
      * @return registry factory
      */
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ConnectStateListener.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ConnectStateListener.java
deleted file mode 100644
index 6675ef6..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ConnectStateListener.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-public interface ConnectStateListener {
-
-    void notify(RegistryConnectState state);
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/DataChangeEvent.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/DataChangeEvent.java
deleted file mode 100644
index a6aa32d..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/DataChangeEvent.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-/**
- * Monitor the type of data changes
- */
-public enum DataChangeEvent {
-
-    ADD("ADD", 1),
-    REMOVE("REMOVE", 2),
-    UPDATE("UPDATE",3);
-
-    private String type;
-
-    private int value;
-
-    DataChangeEvent(String type, int value) {
-        this.type = type;
-        this.value = value;
-    }
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java
deleted file mode 100644
index 94b13e6..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-import java.util.HashMap;
-
-/**
- * The registry node monitors subscriptions
- */
-public class ListenerManager {
-
-    /**
-     * All message subscriptions must be subscribed uniformly at startup.
-     * A node path only supports one listener
-     */
-    private static HashMap<String, SubscribeListener> listeners = new HashMap<>();
-
-    /**
-     * Check whether the key has been monitored
-     */
-    public static boolean checkHasListeners(String path) {
-        return null != listeners.get(path);
-    }
-
-    /**
-     * add listener(A node can only be monitored by one listener)
-     */
-    public static void addListener(String path, SubscribeListener listener) {
-        listeners.put(path, listener);
-    }
-
-    /**
-     * remove listener
-     */
-    public static void removeListener(String path) {
-        listeners.remove(path);
-    }
-
-    /**
-     *
-     *After the data changes, it is distributed to the corresponding listener for processing
-     */
-    public static void dataChange(String key,String path, String data, DataChangeEvent dataChangeEvent) {
-        SubscribeListener notifyListener = listeners.get(key);
-        if (null == notifyListener) {
-            return;
-        }
-        notifyListener.notify(path, data, dataChangeEvent);
-    }
-
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/Registry.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/Registry.java
deleted file mode 100644
index 11fe25a..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/Registry.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package org.apache.dolphinscheduler.spi.register;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * The final display of all registry component data must follow a tree structure.
- * Therefore, some registry may need to do a layer of internal conversion, such as Etcd
- */
-public interface Registry {
-
-    /**
-     * initialize registry center.
-     */
-    void init(Map<String, String> registerData);
-
-    /**
-     * close registry
-     */
-    void close();
-
-    /**
-     * subscribe registry data change, a path can only be monitored by one listener
-     */
-    boolean subscribe(String path, SubscribeListener subscribeListener);
-
-    /**
-     * unsubscribe
-     */
-    void unsubscribe(String path);
-
-    /**
-     * Registry status monitoring, globally unique. Only one is allowed to subscribe.
-     */
-    void addConnectionStateListener(RegistryConnectListener registryConnectListener);
-
-    /**
-     * get key
-     */
-    String get(String key);
-
-    /**
-     * delete
-     */
-    void remove(String key);
-
-    /**
-     * persist data
-     */
-    void persist(String key, String value);
-
-    /**
-     *persist ephemeral data
-     */
-    void persistEphemeral(String key, String value);
-
-    /**
-     * update data
-     */
-    void update(String key, String value);
-
-    /**
-     * get children keys
-     */
-    List<String> getChildren(String path);
-
-    /**
-     * Judge node is exist or not.
-     */
-    boolean isExisted(String key);
-
-    /**
-     * delete kay
-     */
-    boolean delete(String key);
-
-    /**
-     * Obtain a distributed lock
-     * todo It is best to add expiration time, and automatically release the lock after expiration
-     */
-    boolean acquireLock(String key);
-
-    /**
-     * release key
-     */
-    boolean releaseLock(String key);
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectListener.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectListener.java
deleted file mode 100644
index 83385f8..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectListener.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-public interface RegistryConnectListener {
-
-    void notify(RegistryConnectState newState);
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectState.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectState.java
deleted file mode 100644
index e085e6d..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectState.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-/**
- * All registry connection status must be converted to this
- */
-public enum RegistryConnectState {
-    CONNECTED("connected", 1),
-    RECONNECTED("reconnected", 2),
-    SUSPENDED("suspended", 3),
-    LOST("lost", 4);
-
-    private String description;
-
-    private int state;
-
-    RegistryConnectState(String description, int state) {
-        this.description = description;
-        this.state = state;
-    }
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryException.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryException.java
deleted file mode 100644
index 884f005..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-/**
- * registry exception
- */
-public class RegistryException extends RuntimeException {
-
-    public RegistryException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public RegistryException(String message) {
-        super(message);
-    }
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryFactory.java
deleted file mode 100644
index 244c0f4..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-/**
- * Registry the component factory, all registry must implement this interface
- */
-public interface RegistryFactory {
-
-    /**
-     * get registry component name
-     */
-    String getName();
-
-    /**
-     * get registry
-     */
-    Registry create();
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryPluginManager.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryPluginManager.java
deleted file mode 100644
index 211795f..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryPluginManager.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
-import org.apache.dolphinscheduler.spi.classloader.ThreadContextClassLoader;
-import org.apache.dolphinscheduler.spi.plugin.AbstractDolphinPluginManager;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The plug-in address of the registry needs to be configured.
- * Multi-registries are not supported.
- * When the plug-in directory contains multiple plug-ins, only the configured plug-in will be used.
- * todo It’s not good to put it here, consider creating a separate API module for each plugin
- */
-public class RegistryPluginManager extends AbstractDolphinPluginManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(RegistryPluginManager.class);
-
-    private RegistryFactory registryFactory;
-
-    public static Registry registry;
-
-    private String registerPluginName;
-
-    public RegistryPluginManager(String registerPluginName) {
-        this.registerPluginName = registerPluginName;
-    }
-
-    @Override
-    public void installPlugin(DolphinSchedulerPlugin dolphinSchedulerPlugin) {
-        for (RegistryFactory registryFactory : dolphinSchedulerPlugin.getRegisterFactorys()) {
-            logger.info("Registering Registry Plugin '{}'", registryFactory.getName());
-            if (registerPluginName.equals(registryFactory.getName())) {
-                this.registryFactory = registryFactory;
-                loadRegistry();
-                return;
-            }
-        }
-        if (null == registry) {
-            throw new RegistryException(String.format("not found %s registry plugin ", registerPluginName));
-        }
-    }
-
-    /**
-     * load registry
-     */
-    private void loadRegistry() {
-        try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(registryFactory.getClass().getClassLoader())) {
-            registry = registryFactory.create();
-        }
-    }
-
-    /**
-     * get registry
-     * @return registry
-     */
-    public  Registry getRegistry() {
-        if (null == registry) {
-            throw new RegistryException("not install registry");
-        }
-        return registry;
-    }
-
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java
deleted file mode 100644
index 3db7f2e..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-/**
- * Registration center subscription. All listeners must implement this interface
- */
-public interface SubscribeListener {
-
-    /**
-     * Processing logic when the subscription node changes
-     */
-    void notify(String path, String data, DataChangeEvent dataChangeEvent);
-
-}
diff --git a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
index 4a41770..27d9a55 100644
--- a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
+++ b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
@@ -86,15 +86,6 @@ public class StandaloneServer {
     private static void startRegistry() throws Exception {
         final TestingServer server = new TestingServer(true);
         System.setProperty("registry.servers", server.getConnectString());
-
-        final Path registryPath = Paths.get(
-                StandaloneServer.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
-                "../../../dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml"
-        ).toAbsolutePath();
-        if (Files.exists(registryPath)) {
-            System.setProperty("registry.plugin.binding", registryPath.toString());
-            System.setProperty("registry.plugin.dir", "");
-        }
     }
 
     private static void startDatabase() throws IOException, SQLException {
diff --git a/pom.xml b/pom.xml
index 0c42a7a..a475181 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,7 @@
         <byte-buddy.version>1.9.16</byte-buddy.version>
         <java-websocket.version>1.5.1</java-websocket.version>
         <py4j.version>0.10.9</py4j.version>
+        <auto-service.version>1.0.1</auto-service.version>
     </properties>
 
     <dependencyManagement>
@@ -231,7 +232,12 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.dolphinscheduler</groupId>
-                <artifactId>dolphinscheduler-registry-plugin</artifactId>
+                <artifactId>dolphinscheduler-registry</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.dolphinscheduler</groupId>
+                <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
                 <version>${project.version}</version>
             </dependency>
             <dependency>
@@ -271,6 +277,12 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.dolphinscheduler</groupId>
+                <artifactId>dolphinscheduler-registry-api</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
+            <dependency>
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-framework</artifactId>
                 <version>${curator.version}</version>
@@ -690,6 +702,13 @@
                 <artifactId>py4j</artifactId>
                 <version>${py4j.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>com.google.auto.service</groupId>
+                <artifactId>auto-service</artifactId>
+                <version>${auto-service.version}</version>
+                <optional>true</optional>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -1234,7 +1253,7 @@
     <modules>
         <module>dolphinscheduler-spi</module>
         <module>dolphinscheduler-alert-plugin</module>
-        <module>dolphinscheduler-registry-plugin</module>
+        <module>dolphinscheduler-registry</module>
         <module>dolphinscheduler-task-plugin</module>
         <module>dolphinscheduler-ui</module>
         <module>dolphinscheduler-server</module>
diff --git a/script/env/dolphinscheduler_env.sh b/script/env/dolphinscheduler_env.sh
index e62f54e..1798a5d 100755
--- a/script/env/dolphinscheduler_env.sh
+++ b/script/env/dolphinscheduler_env.sh
@@ -20,7 +20,7 @@ export HADOOP_CONF_DIR=/opt/soft/hadoop/etc/hadoop
 export SPARK_HOME1=/opt/soft/spark1
 export SPARK_HOME2=/opt/soft/spark2
 export PYTHON_HOME=/opt/soft/python
-export JAVA_HOME=/opt/soft/java
+export JAVA_HOME=${JAVA_HOME:-/opt/soft/java}
 export HIVE_HOME=/opt/soft/hive
 export FLINK_HOME=/opt/soft/flink
 export DATAX_HOME=/opt/soft/datax