You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/11/07 07:18:14 UTC
[dolphinscheduler] branch dev updated: Refactor registry plugin and
simplify its usage (#6712)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5400abc Refactor registry plugin and simplify its usage (#6712)
5400abc is described below
commit 5400abc74fdb16717d22c15f964cad1a6dfd899e
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Sun Nov 7 15:18:10 2021 +0800
Refactor registry plugin and simplify its usage (#6712)
---
.../registry.properties => .github/CODEOWNERS | 8 +-
.../conf/dolphinscheduler/registry.properties.tpl | 8 -
docker/build/startup-init-conf.sh | 1 -
docker/docker-swarm/config.env.sh | 1 -
docker/docker-swarm/docker-compose.yml | 2 +-
.../dolphinscheduler/templates/_helpers.tpl | 8 +-
.../api/service/impl/MonitorServiceImpl.java | 75 ++--
.../api/service/impl/WorkerGroupServiceImpl.java | 29 +-
.../api/utils/RegistryCenterUtils.java | 82 ----
.../api/controller/AbstractControllerTest.java | 17 +-
.../api/controller/SchedulerControllerTest.java | 3 -
.../api/controller/TaskInstanceControllerTest.java | 3 -
.../api/controller/TenantControllerTest.java | 3 -
.../api/controller/WorkerGroupControllerTest.java | 7 +-
.../api/service/WorkerGroupServiceTest.java | 96 +----
.../api/utils/RegistryCenterUtilsTest.java | 43 --
.../apache/dolphinscheduler/common/Constants.java | 2 -
.../dolphinscheduler/common/enums/NodeType.java | 11 +-
.../src/main/provisio/dolphinscheduler.xml | 5 -
.../ZookeeperConnectionStateListener.java | 56 ---
.../zookeeper/ZookeeperRegistryPlugin.java | 34 --
.../dolphinscheduler-registry-api/pom.xml | 32 ++
.../registry/api/ConnectionListener.java | 25 ++
.../registry/api/ConnectionState.java | 27 ++
.../dolphinscheduler/registry/api/Event.java | 48 +++
.../dolphinscheduler/registry/api/Registry.java | 48 +++
.../registry/api/RegistryException.java | 31 ++
.../registry/api/RegistryFactory.java | 26 ++
.../registry/api/RegistryFactoryLoader.java | 35 ++
.../registry/api/SubscribeListener.java | 24 ++
.../dolphinscheduler-registry-zookeeper/pom.xml | 18 +-
.../registry/zookeeper/ZookeeperConfiguration.java | 0
.../ZookeeperConnectionStateListener.java | 54 +++
.../registry/zookeeper/ZookeeperRegistry.java | 229 ++++------
.../zookeeper/ZookeeperRegistryFactory.java | 14 +-
.../registry/zookeeper/ZookeeperRegistryTest.java | 23 +-
.../dolphinscheduler-registry-plugins/pom.xml | 36 ++
.../pom.xml | 25 +-
.../master/registry/MasterRegistryClient.java | 73 ++--
.../registry/MasterRegistryDataListener.java | 40 +-
.../server/master/registry/ServerNodeManager.java | 58 +--
.../server/monitor/RegistryMonitorImpl.java | 22 +-
.../server/registry/HeartBeatTask.java | 10 +-
.../worker/processor/TaskCallbackService.java | 6 -
.../worker/registry/WorkerRegistryClient.java | 7 +-
.../src/main/resources/config/install_config.conf | 5 -
.../master/dispatch/ExecutorDispatcherTest.java | 2 +-
.../master/registry/MasterRegistryClientTest.java | 16 +-
dolphinscheduler-service/pom.xml | 4 +
.../service/registry/RegistryCenter.java | 243 -----------
.../service/registry/RegistryClient.java | 467 ++++++++-------------
.../src/main/resources/registry.properties | 13 -
.../service/registry/RegistryClientTest.java | 74 ----
.../service/registry/RegistryPluginTest.java | 45 --
.../spi/DolphinSchedulerPlugin.java | 9 -
.../spi/register/ConnectStateListener.java | 23 -
.../spi/register/DataChangeEvent.java | 37 --
.../spi/register/ListenerManager.java | 66 ---
.../dolphinscheduler/spi/register/Registry.java | 102 -----
.../spi/register/RegistryConnectListener.java | 23 -
.../spi/register/RegistryConnectState.java | 37 --
.../spi/register/RegistryException.java | 32 --
.../spi/register/RegistryFactory.java | 34 --
.../spi/register/RegistryPluginManager.java | 82 ----
.../spi/register/SubscribeListener.java | 30 --
.../dolphinscheduler/server/StandaloneServer.java | 9 -
pom.xml | 23 +-
script/env/dolphinscheduler_env.sh | 2 +-
68 files changed, 882 insertions(+), 1901 deletions(-)
diff --git a/dolphinscheduler-standalone-server/src/main/resources/registry.properties b/.github/CODEOWNERS
similarity index 74%
rename from dolphinscheduler-standalone-server/src/main/resources/registry.properties
rename to .github/CODEOWNERS
index 3c33799..9def4be 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/registry.properties
+++ b/.github/CODEOWNERS
@@ -15,8 +15,6 @@
# limitations under the License.
#
-# This file is only to override the production configurations in standalone server.
-
-registry.plugin.dir=./dolphinscheduler-dist/target/dolphinscheduler-dist-2.0.0-SNAPSHOT/lib/plugin/registry/zookeeper
-registry.plugin.name=zookeeper
-registry.servers=127.0.0.1:2181
+dolphinscheduler/dolphinscheduler-e2e @kezhenxu94
+dolphinscheduler/dolphinscheduler-registry @kezhenxu94
+dolphinscheduler/dolphinscheduler-standalone-server @kezhenxu94
diff --git a/docker/build/conf/dolphinscheduler/registry.properties.tpl b/docker/build/conf/dolphinscheduler/registry.properties.tpl
index 40836f5..9ee8add 100644
--- a/docker/build/conf/dolphinscheduler/registry.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/registry.properties.tpl
@@ -15,13 +15,5 @@
# limitations under the License.
#
-#registry.plugin.dir config the Registry Plugin dir.
-registry.plugin.dir=${REGISTRY_PLUGIN_DIR}
-
registry.plugin.name=${REGISTRY_PLUGIN_NAME}
registry.servers=${REGISTRY_SERVERS}
-
-#maven.local.repository=/usr/local/localRepository
-
-#registry.plugin.binding config the Registry Plugin need be load when development and run in IDE
-#registry.plugin.binding=./dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml
diff --git a/docker/build/startup-init-conf.sh b/docker/build/startup-init-conf.sh
index 35f70bb..4717834 100755
--- a/docker/build/startup-init-conf.sh
+++ b/docker/build/startup-init-conf.sh
@@ -37,7 +37,6 @@ export DATABASE_PARAMS=${DATABASE_PARAMS:-"characterEncoding=utf8"}
#============================================================================
# Registry
#============================================================================
-export REGISTRY_PLUGIN_DIR=${REGISTRY_PLUGIN_DIR:-"lib/plugin/registry"}
export REGISTRY_PLUGIN_NAME=${REGISTRY_PLUGIN_NAME:-"zookeeper"}
export REGISTRY_SERVERS=${REGISTRY_SERVERS:-"127.0.0.1:2181"}
diff --git a/docker/docker-swarm/config.env.sh b/docker/docker-swarm/config.env.sh
index afc09b0..0c79a04 100755
--- a/docker/docker-swarm/config.env.sh
+++ b/docker/docker-swarm/config.env.sh
@@ -39,7 +39,6 @@ DATABASE_PARAMS=characterEncoding=utf8
#============================================================================
# Registry
#============================================================================
-REGISTRY_PLUGIN_DIR=lib/plugin/registry
REGISTRY_PLUGIN_NAME=zookeeper
REGISTRY_SERVERS=dolphinscheduler-zookeeper:2181
diff --git a/docker/docker-swarm/docker-compose.yml b/docker/docker-swarm/docker-compose.yml
index 3f63e79..2f67ae8 100644
--- a/docker/docker-swarm/docker-compose.yml
+++ b/docker/docker-swarm/docker-compose.yml
@@ -140,4 +140,4 @@ volumes:
dolphinscheduler-worker-data:
dolphinscheduler-logs:
dolphinscheduler-shared-local:
- dolphinscheduler-resource-local:
\ No newline at end of file
+ dolphinscheduler-resource-local:
diff --git a/docker/kubernetes/dolphinscheduler/templates/_helpers.tpl b/docker/kubernetes/dolphinscheduler/templates/_helpers.tpl
index 2b0786f..5ef83a7 100644
--- a/docker/kubernetes/dolphinscheduler/templates/_helpers.tpl
+++ b/docker/kubernetes/dolphinscheduler/templates/_helpers.tpl
@@ -166,12 +166,6 @@ Create a database environment variables.
Create a registry environment variables.
*/}}
{{- define "dolphinscheduler.registry.env_vars" -}}
-- name: REGISTRY_PLUGIN_DIR
- {{- if .Values.zookeeper.enabled }}
- value: "lib/plugin/registry"
- {{- else }}
- value: {{ .Values.externalRegistry.registryPluginDir }}
- {{- end }}
- name: REGISTRY_PLUGIN_NAME
{{- if .Values.zookeeper.enabled }}
value: "zookeeper"
@@ -239,4 +233,4 @@ Create a fsFileResourcePersistence volumeMount.
- mountPath: {{ default "/dolphinscheduler" .Values.common.configmap.RESOURCE_UPLOAD_PATH | quote }}
name: {{ include "dolphinscheduler.fullname" . }}-fs-file
{{- end -}}
-{{- end -}}
\ No newline at end of file
+{{- end -}}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java
index cb3b0b2..ccec49b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java
@@ -19,14 +19,14 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.MonitorService;
-import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerServerModel;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.HashMap;
import java.util.List;
@@ -48,6 +48,9 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
@Autowired
private MonitorDBDao monitorDBDao;
+ @Autowired
+ private RegistryClient registryClient;
+
/**
* query database state
*
@@ -55,7 +58,7 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
* @return data base state
*/
@Override
- public Map<String,Object> queryDatabaseState(User loginUser) {
+ public Map<String, Object> queryDatabaseState(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<MonitorRecord> monitorRecordList = monitorDBDao.queryDatabaseState();
@@ -74,13 +77,13 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
* @return master information list
*/
@Override
- public Map<String,Object> queryMaster(User loginUser) {
+ public Map<String, Object> queryMaster(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<Server> masterServers = getServerListFromRegistry(true);
result.put(Constants.DATA_LIST, masterServers);
- putMsg(result,Status.SUCCESS);
+ putMsg(result, Status.SUCCESS);
return result;
}
@@ -92,12 +95,10 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
* @return zookeeper information list
*/
@Override
- public Map<String,Object> queryZookeeperState(User loginUser) {
+ public Map<String, Object> queryZookeeperState(User loginUser) {
Map<String, Object> result = new HashMap<>();
- List<ZookeeperRecord> zookeeperRecordList = RegistryCenterUtils.zookeeperInfoList();
-
- result.put(Constants.DATA_LIST, zookeeperRecordList);
+ result.put(Constants.DATA_LIST, null);
putMsg(result, Status.SUCCESS);
return result;
@@ -111,46 +112,48 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic
* @return worker information list
*/
@Override
- public Map<String,Object> queryWorker(User loginUser) {
+ public Map<String, Object> queryWorker(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<WorkerServerModel> workerServers = getServerListFromRegistry(false)
- .stream()
- .map((Server server) -> {
- WorkerServerModel model = new WorkerServerModel();
- model.setId(server.getId());
- model.setHost(server.getHost());
- model.setPort(server.getPort());
- model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
- model.setResInfo(server.getResInfo());
- model.setCreateTime(server.getCreateTime());
- model.setLastHeartbeatTime(server.getLastHeartbeatTime());
- return model;
- })
- .collect(Collectors.toList());
+ .stream()
+ .map((Server server) -> {
+ WorkerServerModel model = new WorkerServerModel();
+ model.setId(server.getId());
+ model.setHost(server.getHost());
+ model.setPort(server.getPort());
+ model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
+ model.setResInfo(server.getResInfo());
+ model.setCreateTime(server.getCreateTime());
+ model.setLastHeartbeatTime(server.getLastHeartbeatTime());
+ return model;
+ })
+ .collect(Collectors.toList());
Map<String, WorkerServerModel> workerHostPortServerMapping = workerServers
- .stream()
- .collect(Collectors.toMap(
- (WorkerServerModel worker) -> {
- String[] s = worker.getZkDirectories().iterator().next().split("/");
- return s[s.length - 1];
- }
- , Function.identity()
- , (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
- oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
- return oldOne;
- }));
+ .stream()
+ .collect(Collectors.toMap(
+ (WorkerServerModel worker) -> {
+ String[] s = worker.getZkDirectories().iterator().next().split("/");
+ return s[s.length - 1];
+ }
+ , Function.identity()
+ , (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
+ oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
+ return oldOne;
+ }));
result.put(Constants.DATA_LIST, workerHostPortServerMapping.values());
- putMsg(result,Status.SUCCESS);
+ putMsg(result, Status.SUCCESS);
return result;
}
@Override
public List<Server> getServerListFromRegistry(boolean isMaster) {
- return isMaster ? RegistryCenterUtils.getMasterServers() : RegistryCenterUtils.getWorkerServers();
+ return isMaster
+ ? registryClient.getServerList(NodeType.MASTER)
+ : registryClient.getServerList(NodeType.WORKER);
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 379af64..06e6958 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
-import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
@@ -31,14 +30,17 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -58,10 +60,13 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceImpl.class);
@Autowired
- WorkerGroupMapper workerGroupMapper;
+ private WorkerGroupMapper workerGroupMapper;
@Autowired
- ProcessInstanceMapper processInstanceMapper;
+ private ProcessInstanceMapper processInstanceMapper;
+
+ @Autowired
+ private RegistryClient registryClient;
/**
* create or update a worker group
@@ -139,7 +144,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
}
// check zookeeper
String workerGroupPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH + workerGroup.getName();
- return RegistryCenterUtils.isNodeExisted(workerGroupPath);
+ return registryClient.exists(workerGroupPath);
}
/**
@@ -149,7 +154,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
* @return boolean
*/
private String checkWorkerGroupAddrList(WorkerGroup workerGroup) {
- Map<String, String> serverMaps = RegistryCenterUtils.getServerMaps(NodeType.WORKER, true);
+ Map<String, String> serverMaps = registryClient.getServerMaps(NodeType.WORKER, true);
if (Strings.isNullOrEmpty(workerGroup.getAddrList())) {
return null;
}
@@ -250,11 +255,11 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
List<WorkerGroup> workerGroups = workerGroupMapper.queryAllWorkerGroup();
// worker groups from zookeeper
String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
- List<String> workerGroupList = null;
+ Collection<String> workerGroupList = null;
try {
- workerGroupList = RegistryCenterUtils.getChildrenNodes(workerPath);
+ workerGroupList = registryClient.getChildrenKeys(workerPath);
} catch (Exception e) {
- logger.error("getWorkerGroups exception: {}, workerPath: {}, isPaging: {}", e.getMessage(), workerPath, isPaging);
+ logger.error("getWorkerGroups exception, workerPath: {}, isPaging: {}", workerPath, isPaging, e);
}
if (CollectionUtils.isEmpty(workerGroupList)) {
@@ -268,9 +273,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
for (String workerGroup : workerGroupList) {
String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup;
- List<String> childrenNodes = null;
+ Collection<String> childrenNodes = null;
try {
- childrenNodes = RegistryCenterUtils.getChildrenNodes(workerGroupPath);
+ childrenNodes = registryClient.getChildrenKeys(workerGroupPath);
} catch (Exception e) {
logger.error("getChildrenNodes exception: {}, workerGroupPath: {}", e.getMessage(), workerGroupPath);
}
@@ -281,7 +286,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
wg.setName(workerGroup);
if (isPaging) {
wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
- String registeredValue = RegistryCenterUtils.getNodeData(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.get(0));
+ String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next());
wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[6]));
wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[7]));
wg.setSystemDefault(true);
@@ -328,7 +333,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Override
public Map<String, Object> getWorkerAddressList() {
Map<String, Object> result = new HashMap<>();
- List<String> serverNodeList = RegistryCenterUtils.getServerNodeList(NodeType.WORKER, true);
+ Set<String> serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER, true);
result.put(Constants.DATA_LIST, serverNodeList);
putMsg(result, Status.SUCCESS);
return result;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtils.java
deleted file mode 100644
index 71e0456..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtils.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.utils;
-
-import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * monitor zookeeper info todo registry-spi
- * fixme Some of the information obtained in the api belongs to the unique information of zk.
- * I am not sure whether there is a good abstraction method. This is related to whether the specific plug-in is provided.
- */
-public class RegistryCenterUtils {
-
- private static RegistryClient registryClient = RegistryClient.getInstance();
-
- /**
- * @return zookeeper info list
- */
- public static List<ZookeeperRecord> zookeeperInfoList() {
- return null;
- }
-
- /**
- * get master servers
- *
- * @return master server information
- */
- public static List<Server> getMasterServers() {
- return registryClient.getServerList(NodeType.MASTER);
- }
-
- /**
- * master construct is the same with worker, use the master instead
- *
- * @return worker server informations
- */
- public static List<Server> getWorkerServers() {
- return registryClient.getServerList(NodeType.WORKER);
- }
-
- public static Map<String, String> getServerMaps(NodeType nodeType, boolean hostOnly) {
- return registryClient.getServerMaps(nodeType, hostOnly);
- }
-
- public static List<String> getServerNodeList(NodeType nodeType, boolean hostOnly) {
- return registryClient.getServerNodeList(nodeType, hostOnly);
- }
-
- public static boolean isNodeExisted(String key) {
- return registryClient.isExisted(key);
- }
-
- public static List<String> getChildrenNodes(final String key) {
- return registryClient.getChildrenKeys(key);
- }
-
- public static String getNodeData(String key) {
- return registryClient.get(key);
- }
-}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java
index 41a1cc3..dcc9da3 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java
@@ -20,11 +20,9 @@ package org.apache.dolphinscheduler.api.controller;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.SessionService;
-import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang.StringUtils;
@@ -36,11 +34,6 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@@ -51,11 +44,8 @@ import org.springframework.web.context.WebApplicationContext;
/**
* abstract controller test
*/
-@RunWith(PowerMockRunner.class)
-@PowerMockRunnerDelegate(SpringRunner.class)
+@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApiApplicationServer.class)
-@PrepareForTest({ RegistryCenterUtils.class, RegistryClient.class })
-@PowerMockIgnore({"javax.management.*"})
public class AbstractControllerTest {
public static final String SESSION_ID = "sessionId";
@@ -74,9 +64,6 @@ public class AbstractControllerTest {
@Before
public void setUp() {
- PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
- PowerMockito.mockStatic(RegistryCenterUtils.class);
-
mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
createSession();
@@ -98,7 +85,7 @@ public class AbstractControllerTest {
String session = sessionService.createSession(loginUser, "127.0.0.1");
sessionId = session;
- Assert.assertTrue(!StringUtils.isEmpty(session));
+ Assert.assertFalse(StringUtils.isEmpty(session));
}
public Map<String, Object> success() {
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
index 62022d8..a0a1772 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
@@ -48,9 +48,6 @@ import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
-/**
- * scheduler controller test
- */
public class SchedulerControllerTest extends AbstractControllerTest {
private static Logger logger = LoggerFactory.getLogger(SchedulerControllerTest.class);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java
index 486707c..a61e7d3 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java
@@ -50,9 +50,6 @@ import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
-/**
- * task instance controller test
- */
public class TaskInstanceControllerTest extends AbstractControllerTest {
@InjectMocks
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java
index 14cd52f..7f675a5 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java
@@ -37,9 +37,6 @@ import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
-/**
- * tenant controller test
- */
public class TenantControllerTest extends AbstractControllerTest {
private static Logger logger = LoggerFactory.getLogger(TenantControllerTest.class);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
index f6e79bf..873236f 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
@@ -22,7 +22,6 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilder
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-import org.apache.dolphinscheduler.api.utils.RegistryCenterUtils;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
@@ -30,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.HashMap;
import java.util.Map;
@@ -59,12 +59,15 @@ public class WorkerGroupControllerTest extends AbstractControllerTest {
@MockBean
private ProcessInstanceMapper processInstanceMapper;
+ @MockBean
+ private RegistryClient registryClient;
+
@Test
public void testSaveWorkerGroup() throws Exception {
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("192.168.0.1", "192.168.0.1");
serverMaps.put("192.168.0.2", "192.168.0.2");
- PowerMockito.when(RegistryCenterUtils.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps);
+ PowerMockito.when(registryClient.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("name","cxc_work_group");
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index 0c76835..1dfcc64 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service;
+import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
@@ -33,105 +34,32 @@ import java.util.List;
import java.util.Map;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-/**
- * worker group service test
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ RegistryClient.class })
-@PowerMockIgnore({"javax.management.*"})
+@RunWith(SpringJUnit4ClassRunner.class)
+@SpringBootTest(classes = ApiApplicationServer.class)
public class WorkerGroupServiceTest {
+ @MockBean
+ private RegistryClient registryClient;
- @InjectMocks
+ @Autowired
private WorkerGroupServiceImpl workerGroupService;
- @Mock
+ @MockBean
private WorkerGroupMapper workerGroupMapper;
- @Mock
+ @MockBean
private ProcessInstanceMapper processInstanceMapper;
-
private String groupName = "groupName000001";
- /* @Before
- public void init() {
- ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
- zookeeperConfig.setDsRoot("/dolphinscheduler_qzw");
- Mockito.when(zookeeperCachedOperator.getZookeeperConfig()).thenReturn(zookeeperConfig);
-
- String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
-
- List<String> workerGroupStrList = new ArrayList<>();
- workerGroupStrList.add("default");
- workerGroupStrList.add("test");
- Mockito.when(zookeeperCachedOperator.getChildrenNodes(workerPath)).thenReturn(workerGroupStrList);
-
- List<String> defaultAddressList = new ArrayList<>();
- defaultAddressList.add("192.168.220.188:1234");
- defaultAddressList.add("192.168.220.189:1234");
-
- Mockito.when(zookeeperCachedOperator.getChildrenNodes(workerPath + "/default")).thenReturn(defaultAddressList);
-
- Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultAddressList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238");
- }
-
-*//**
- * create or update a worker group
- *//*
- @Test
- public void testSaveWorkerGroup() {
- // worker server maps
- Map<String, String> serverMaps = new HashMap<>();
- serverMaps.put("127.0.0.1:1234", "0.3,0.07,4.4,7.42,16.0,0.3,2021-03-19 20:17:58,2021-03-19 20:25:29,0,79214");
- Mockito.when(zookeeperMonitor.getServerMaps(ZKNodeType.WORKER, true)).thenReturn(serverMaps);
-
- User user = new User();
- // general user add
- user.setUserType(UserType.GENERAL_USER);
- Map<String, Object> result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234");
- Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(), result.get(Constants.MSG));
-
- // success
- user.setUserType(UserType.ADMIN_USER);
- result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234");
- Assert.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG));
- // group name exist
- Mockito.when(workerGroupMapper.selectById(2)).thenReturn(getWorkerGroup(2));
- Mockito.when(workerGroupMapper.queryWorkerGroupByName(groupName)).thenReturn(getList());
- result = workerGroupService.saveWorkerGroup(user, 2, groupName, "127.0.0.1:1234");
- Assert.assertEquals(Status.NAME_EXIST, result.get(Constants.STATUS));
- }*/
-
- /**
- * query worker group paging
- */
- /* @Test
- public void testQueryAllGroupPaging() {
- User user = new User();
- // general user add
- user.setUserType(UserType.ADMIN_USER);
- Map<String, Object> result = workerGroupService.queryAllGroupPaging(user, 1, 10, null);
- PageInfo<WorkerGroup> pageInfo = (PageInfo) result.get(Constants.DATA_LIST);
- Assert.assertEquals(pageInfo.getLists().size(), 1);
- }*/
-
- @Before
- public void before() {
- PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
- }
-
@Test
public void testQueryAllGroup() {
Map<String, Object> result = workerGroupService.queryAllGroup();
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtilsTest.java
deleted file mode 100644
index b8b4945..0000000
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/RegistryCenterUtilsTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.utils;
-
-import org.apache.dolphinscheduler.common.model.Server;
-
-import java.util.List;
-
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * zookeeper monitor utils test
- */
-@Ignore
-public class RegistryCenterUtilsTest {
-
- @Test
- public void testGetMasterList(){
- List<Server> masterServerList = RegistryCenterUtils.getMasterServers();
- List<Server> workerServerList = RegistryCenterUtils.getWorkerServers();
-
- Assert.assertTrue(masterServerList.size() >= 0);
- Assert.assertTrue(workerServerList.size() >= 0);
- }
-
-}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index fa9a490..410070e 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -90,8 +90,6 @@ public final class Constants {
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters";
- public static final String REGISTRY_PLUGIN_BINDING = "registry.plugin.binding";
- public static final String REGISTRY_PLUGIN_DIR = "registry.plugin.dir";
public static final String REGISTRY_SERVERS = "registry.servers";
/**
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java
index acc3c02..cb247cf 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/NodeType.java
@@ -16,15 +16,6 @@
*/
package org.apache.dolphinscheduler.common.enums;
-/**
- * zk node type
- */
public enum NodeType {
-
- /**
- * 0 master node;
- * 1 worker node;
- * 2 dead_server node;
- */
- MASTER, WORKER, DEAD_SERVER;
+ MASTER, WORKER, DEAD_SERVER
}
diff --git a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
index 9ecd9ef..d7119d0 100644
--- a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
+++ b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
@@ -75,11 +75,6 @@
<unpack/>
</artifact>
</artifactSet>
- <artifactSet to="lib/plugin/registry/zookeeper">
- <artifact id="${project.groupId}:dolphinscheduler-registry-zookeeper:zip:${project.version}">
- <unpack/>
- </artifact>
- </artifactSet>
<!-- Task Plugins -->
<artifactSet to="lib/plugin/task/datax">
<artifact id="${project.groupId}:dolphinscheduler-task-datax:zip:${project.version}">
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java
deleted file mode 100644
index cda98ef..0000000
--- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.registry.zookeeper;
-
-import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
-import org.apache.dolphinscheduler.spi.register.RegistryConnectState;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZookeeperConnectionStateListener implements ConnectionStateListener {
-
- private static final Logger logger = LoggerFactory.getLogger(ZookeeperConnectionStateListener.class);
-
- private RegistryConnectListener registryConnectListener;
-
- public ZookeeperConnectionStateListener(RegistryConnectListener registryConnectListener) {
- this.registryConnectListener = registryConnectListener;
- }
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState) {
-
- if (newState == ConnectionState.LOST) {
- logger.error("connection lost from zookeeper");
- registryConnectListener.notify(RegistryConnectState.LOST);
- } else if (newState == ConnectionState.RECONNECTED) {
- logger.info("reconnected to zookeeper");
- registryConnectListener.notify(RegistryConnectState.RECONNECTED);
- } else if (newState == ConnectionState.SUSPENDED) {
- logger.warn("zookeeper connection SUSPENDED");
- registryConnectListener.notify(RegistryConnectState.SUSPENDED);
- }
-
- }
-
-}
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryPlugin.java b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryPlugin.java
deleted file mode 100644
index 85723ad..0000000
--- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryPlugin.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.registry.zookeeper;
-
-import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
-import org.apache.dolphinscheduler.spi.register.RegistryFactory;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * zookeeper registry plugin
- */
-public class ZookeeperRegistryPlugin implements DolphinSchedulerPlugin {
-
- @Override
- public Iterable<RegistryFactory> getRegisterFactorys() {
- return ImmutableList.of(new ZookeeperRegistryFactory());
- }
-}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-api/pom.xml
new file mode 100644
index 0000000..1032710
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to Apache Software Foundation (ASF) under one or more contributor
+ ~ license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright
+ ~ ownership. Apache Software Foundation (ASF) licenses this file to you under
+ ~ the Apache License, Version 2.0 (the "License"); you may
+ ~ not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dolphinscheduler-registry</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>dolphinscheduler-registry-api</artifactId>
+</project>
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionListener.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionListener.java
new file mode 100644
index 0000000..eaebc81
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+@FunctionalInterface
+public interface ConnectionListener {
+ void onUpdate(ConnectionState newState);
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionState.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionState.java
new file mode 100644
index 0000000..fef3bca
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ConnectionState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+public enum ConnectionState {
+ CONNECTED,
+ RECONNECTED,
+ SUSPENDED,
+ DISCONNECTED
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java
new file mode 100644
index 0000000..be781db
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+@Getter
+@Setter
+@Builder
+@ToString
+@NoArgsConstructor
+@AllArgsConstructor
+@Accessors(fluent = true)
+public class Event {
+ private String key;
+ private String path;
+ private String data;
+ private Type type;
+
+ public enum Type {
+ ADD,
+ REMOVE,
+ UPDATE
+ }
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
new file mode 100644
index 0000000..6057a7e
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Map;
+
+public interface Registry extends Closeable {
+ void start(Map<String, String> config);
+
+ boolean subscribe(String path, SubscribeListener listener);
+
+ void unsubscribe(String path);
+
+ void addConnectionStateListener(ConnectionListener listener);
+
+ String get(String key);
+
+ void put(String key, String value, boolean deleteOnDisconnect);
+
+ void delete(String key);
+
+ Collection<String> children(String key);
+
+ boolean exists(String key);
+
+ boolean acquireLock(String key);
+
+ boolean releaseLock(String key);
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryException.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryException.java
new file mode 100644
index 0000000..b88fe25
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+public final class RegistryException extends RuntimeException {
+
+ public RegistryException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RegistryException(String message) {
+ super(message);
+ }
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactory.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactory.java
new file mode 100644
index 0000000..6903e95
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+public interface RegistryFactory {
+ String name();
+
+ Registry create();
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactoryLoader.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactoryLoader.java
new file mode 100644
index 0000000..45d4596
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryFactoryLoader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.function.Function;
+import java.util.stream.StreamSupport;
+
+public final class RegistryFactoryLoader {
+ public static Map<String, RegistryFactory> load() {
+ final ServiceLoader<RegistryFactory> factories = ServiceLoader.load(RegistryFactory.class);
+ return StreamSupport.stream(factories.spliterator(), false)
+ .collect(toMap(RegistryFactory::name, Function.identity()));
+ }
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java
new file mode 100644
index 0000000..2432eb1
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to Apache Software Foundation (ASF) under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Apache Software Foundation (ASF) licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dolphinscheduler.registry.api;
+
+public interface SubscribeListener {
+ void notify(Event event);
+}
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/pom.xml
similarity index 88%
rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml
rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/pom.xml
index 5ad7ee9..42690b9 100644
--- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/pom.xml
@@ -19,18 +19,19 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>dolphinscheduler-registry-plugin</artifactId>
+ <artifactId>dolphinscheduler-registry-plugins</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
-
<artifactId>dolphinscheduler-registry-zookeeper</artifactId>
- <!-- can be load as a Alert Plugin when development and run server in IDE -->
- <packaging>dolphinscheduler-plugin</packaging>
<dependencies>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-registry-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
@@ -57,7 +58,6 @@
<artifactId>slf4j-api</artifactId>
</dependency>
-
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
@@ -76,11 +76,5 @@
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
-
</dependencies>
-
- <build>
- <finalName>dolphinscheduler-registry-zookeeper-${project.version}</finalName>
- </build>
-
-</project>
\ No newline at end of file
+</project>
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConfiguration.java
similarity index 100%
rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConfiguration.java
rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConfiguration.java
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java
new file mode 100644
index 0000000..9526e0e
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.zookeeper;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionStateListener;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@RequiredArgsConstructor
+public final class ZookeeperConnectionStateListener implements ConnectionStateListener {
+ private final ConnectionListener listener;
+
+ @Override
+ public void stateChanged(CuratorFramework client,
+ org.apache.curator.framework.state.ConnectionState newState) {
+ switch (newState) {
+ case LOST:
+ log.warn("Registry disconnected");
+ listener.onUpdate(ConnectionState.DISCONNECTED);
+ break;
+ case RECONNECTED:
+ log.info("Registry reconnected");
+ listener.onUpdate(ConnectionState.RECONNECTED);
+ break;
+ case SUSPENDED:
+ log.warn("Registry suspended");
+ listener.onUpdate(ConnectionState.SUSPENDED);
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
similarity index 53%
rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
index e84666a..89cb280 100644
--- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
@@ -28,21 +28,19 @@ import static org.apache.dolphinscheduler.plugin.registry.zookeeper.ZookeeperCon
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
-import org.apache.dolphinscheduler.spi.register.ListenerManager;
-import org.apache.dolphinscheduler.spi.register.Registry;
-import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
-import org.apache.dolphinscheduler.spi.register.RegistryException;
-import org.apache.dolphinscheduler.spi.register.SubscribeListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
-import org.apache.curator.framework.api.transaction.TransactionOp;
+import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
@@ -56,28 +54,18 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import com.google.common.base.Strings;
-public class ZookeeperRegistry implements Registry {
+public final class ZookeeperRegistry implements Registry {
private CuratorFramework client;
- /**
- * treeCache map
- * k-subscribe key
- * v-listener
- */
- private Map<String, TreeCache> treeCacheMap = new HashMap<>();
-
- /**
- * Distributed lock map
- */
- private ThreadLocal<Map<String, InterProcessMutex>> threadLocalLockMap = new ThreadLocal<>();
-
- /**
- * build retry policy
- */
+ private final Map<String, TreeCache> treeCacheMap = new ConcurrentHashMap<>();
+
+ private static final ThreadLocal<Map<String, InterProcessMutex>> threadLocalLockMap = new ThreadLocal<>();
+
private static RetryPolicy buildRetryPolicy(Map<String, String> registerData) {
int baseSleepTimeMs = BASE_SLEEP_TIME.getParameterValue(registerData.get(BASE_SLEEP_TIME.getName()));
int maxRetries = MAX_RETRIES.getParameterValue(registerData.get(MAX_RETRIES.getName()));
@@ -85,35 +73,32 @@ public class ZookeeperRegistry implements Registry {
return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries, maxSleepMs);
}
- /**
- * build digest
- */
private static void buildDigest(CuratorFrameworkFactory.Builder builder, String digest) {
builder.authorization(DIGEST.getName(), digest.getBytes(StandardCharsets.UTF_8))
- .aclProvider(new ACLProvider() {
- @Override
- public List<ACL> getDefaultAcl() {
- return ZooDefs.Ids.CREATOR_ALL_ACL;
- }
-
- @Override
- public List<ACL> getAclForPath(final String path) {
- return ZooDefs.Ids.CREATOR_ALL_ACL;
- }
- });
+ .aclProvider(new ACLProvider() {
+ @Override
+ public List<ACL> getDefaultAcl() {
+ return ZooDefs.Ids.CREATOR_ALL_ACL;
+ }
+
+ @Override
+ public List<ACL> getAclForPath(final String path) {
+ return ZooDefs.Ids.CREATOR_ALL_ACL;
+ }
+ });
}
@Override
- public void init(Map<String, String> registerData) {
-
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
- .connectString(SERVERS.getParameterValue(registerData.get(SERVERS.getName())))
- .retryPolicy(buildRetryPolicy(registerData))
- .namespace(NAME_SPACE.getParameterValue(registerData.get(NAME_SPACE.getName())))
- .sessionTimeoutMs(SESSION_TIMEOUT_MS.getParameterValue(registerData.get(SESSION_TIMEOUT_MS.getName())))
- .connectionTimeoutMs(CONNECTION_TIMEOUT_MS.getParameterValue(registerData.get(CONNECTION_TIMEOUT_MS.getName())));
-
- String digest = DIGEST.getParameterValue(registerData.get(DIGEST.getName()));
+ public void start(Map<String, String> config) {
+ CuratorFrameworkFactory.Builder builder =
+ CuratorFrameworkFactory.builder()
+ .connectString(SERVERS.getParameterValue(config.get(SERVERS.getName())))
+ .retryPolicy(buildRetryPolicy(config))
+ .namespace(NAME_SPACE.getParameterValue(config.get(NAME_SPACE.getName())))
+ .sessionTimeoutMs(SESSION_TIMEOUT_MS.getParameterValue(config.get(SESSION_TIMEOUT_MS.getName())))
+ .connectionTimeoutMs(CONNECTION_TIMEOUT_MS.getParameterValue(config.get(CONNECTION_TIMEOUT_MS.getName())));
+
+ String digest = DIGEST.getParameterValue(config.get(DIGEST.getName()));
if (!Strings.isNullOrEmpty(digest)) {
buildDigest(builder, digest);
}
@@ -121,7 +106,7 @@ public class ZookeeperRegistry implements Registry {
client.start();
try {
- if (!client.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT_MS.getParameterValue(registerData.get(BLOCK_UNTIL_CONNECTED_WAIT_MS.getName())), MILLISECONDS)) {
+ if (!client.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT_MS.getParameterValue(config.get(BLOCK_UNTIL_CONNECTED_WAIT_MS.getName())), MILLISECONDS)) {
client.close();
throw new RegistryException("zookeeper connect timeout");
}
@@ -132,55 +117,26 @@ public class ZookeeperRegistry implements Registry {
}
@Override
- public void addConnectionStateListener(RegistryConnectListener registryConnectListener) {
- client.getConnectionStateListenable().addListener(new ZookeeperConnectionStateListener(registryConnectListener));
+ public void addConnectionStateListener(ConnectionListener listener) {
+ client.getConnectionStateListenable().addListener(new ZookeeperConnectionStateListener(listener));
}
@Override
- public boolean subscribe(String path, SubscribeListener subscribeListener) {
- if (null != treeCacheMap.get(path)) {
- return false;
- }
- TreeCache treeCache = new TreeCache(client, path);
- TreeCacheListener treeCacheListener = (client, event) -> {
- TreeCacheEvent.Type type = event.getType();
- DataChangeEvent eventType = null;
- String dataPath = null;
- switch (type) {
- case NODE_ADDED:
- dataPath = event.getData().getPath();
- eventType = DataChangeEvent.ADD;
- break;
- case NODE_UPDATED:
- eventType = DataChangeEvent.UPDATE;
- dataPath = event.getData().getPath();
- break;
- case NODE_REMOVED:
- eventType = DataChangeEvent.REMOVE;
- dataPath = event.getData().getPath();
- break;
- default:
- }
- if (null != eventType && null != dataPath) {
- ListenerManager.dataChange(path, dataPath, new String(event.getData().getData()), eventType);
- }
- };
- treeCache.getListenable().addListener(treeCacheListener);
- treeCacheMap.put(path, treeCache);
+ public boolean subscribe(String path, SubscribeListener listener) {
+ final TreeCache treeCache = treeCacheMap.computeIfAbsent(path, $ -> new TreeCache(client, path));
+ treeCache.getListenable().addListener(($, event) -> listener.notify(new EventAdaptor(event, path)));
try {
treeCache.start();
} catch (Exception e) {
- throw new RegistryException("start zookeeper tree cache error", e);
+ treeCacheMap.remove(path);
+ throw new RegistryException("Failed to subscribe listener for key: " + path, e);
}
- ListenerManager.addListener(path, subscribeListener);
return true;
}
@Override
public void unsubscribe(String path) {
- TreeCache treeCache = treeCacheMap.get(path);
- treeCache.close();
- ListenerManager.removeListener(path);
+ CloseableUtils.closeQuietly(treeCacheMap.get(path));
}
@Override
@@ -193,12 +149,7 @@ public class ZookeeperRegistry implements Registry {
}
@Override
- public void remove(String key) {
- delete(key);
- }
-
- @Override
- public boolean isExisted(String key) {
+ public boolean exists(String key) {
try {
return null != client.checkExists().forPath(key);
} catch (Exception e) {
@@ -207,47 +158,22 @@ public class ZookeeperRegistry implements Registry {
}
@Override
- public void persist(String key, String value) {
- try {
- if (isExisted(key)) {
- update(key, value);
- return;
- }
- client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
+ public void put(String key, String value, boolean deleteOnDisconnect) {
+ final CreateMode mode = deleteOnDisconnect ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
- } catch (Exception e) {
- throw new RegistryException("zookeeper persist error", e);
- }
- }
-
- @Override
- public void persistEphemeral(String key, String value) {
try {
- if (isExisted(key)) {
- update(key, value);
- return;
- }
- client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
+ client.create()
+ .orSetData()
+ .creatingParentsIfNeeded()
+ .withMode(mode)
+ .forPath(key, value.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
- throw new RegistryException("zookeeper persist ephemeral error", e);
+ throw new RegistryException("Failed to put registry key: " + key, e);
}
}
@Override
- public void update(String key, String value) {
- try {
- if (!isExisted(key)) {
- return;
- }
- TransactionOp transactionOp = client.transactionOp();
- client.transaction().forOperations(transactionOp.check().forPath(key), transactionOp.setData().forPath(key, value.getBytes(StandardCharsets.UTF_8)));
- } catch (Exception e) {
- throw new RegistryException("zookeeper update error", e);
- }
- }
-
- @Override
- public List<String> getChildren(String key) {
+ public List<String> children(String key) {
try {
List<String> result = client.getChildren().forPath(key);
result.sort(Comparator.reverseOrder());
@@ -258,23 +184,20 @@ public class ZookeeperRegistry implements Registry {
}
@Override
- public boolean delete(String nodePath) {
+ public void delete(String nodePath) {
try {
client.delete()
- .deletingChildrenIfNeeded()
- .forPath(nodePath);
- } catch (KeeperException.NoNodeException ignore) {
- // the node is not exist, we can believe the node has been removed
-
+ .deletingChildrenIfNeeded()
+ .forPath(nodePath);
+ } catch (KeeperException.NoNodeException ignored) {
+ // Is already deleted or does not exist
} catch (Exception e) {
- throw new RegistryException("zookeeper delete key error", e);
+ throw new RegistryException("Failed to delete registry key: " + nodePath, e);
}
- return true;
}
@Override
public boolean acquireLock(String key) {
-
InterProcessMutex interProcessMutex = new InterProcessMutex(client, key);
try {
interProcessMutex.acquire();
@@ -291,7 +214,6 @@ public class ZookeeperRegistry implements Registry {
throw new RegistryException("zookeeper release lock error", e);
}
}
-
}
@Override
@@ -311,22 +233,35 @@ public class ZookeeperRegistry implements Registry {
return true;
}
- public CuratorFramework getClient() {
- return client;
- }
-
@Override
public void close() {
- treeCacheMap.forEach((key, value) -> value.close());
- waitForCacheClose(500);
+ treeCacheMap.values().forEach(CloseableUtils::closeQuietly);
CloseableUtils.closeQuietly(client);
}
- private void waitForCacheClose(long millis) {
- try {
- Thread.sleep(millis);
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
+ static final class EventAdaptor extends Event {
+ public EventAdaptor(TreeCacheEvent event, String key) {
+ key(key);
+
+ switch (event.getType()) {
+ case NODE_ADDED:
+ type(Type.ADD);
+ break;
+ case NODE_UPDATED:
+ type(Type.UPDATE);
+ break;
+ case NODE_REMOVED:
+ type(Type.REMOVE);
+ break;
+ default:
+ break;
+ }
+
+ final ChildData data = event.getData();
+ if (data != null) {
+ path(data.getPath());
+ data(new String(data.getData()));
+ }
}
}
}
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java
similarity index 76%
rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java
rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java
index 1ecf3e0..949df21 100644
--- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryFactory.java
@@ -17,16 +17,16 @@
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
-import org.apache.dolphinscheduler.spi.register.Registry;
-import org.apache.dolphinscheduler.spi.register.RegistryFactory;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.RegistryFactory;
-/**
- * Zookeeper registry factory
- */
-public class ZookeeperRegistryFactory implements RegistryFactory {
+import com.google.auto.service.AutoService;
+
+@AutoService(RegistryFactory.class)
+public final class ZookeeperRegistryFactory implements RegistryFactory {
@Override
- public String getName() {
+ public String name() {
return "zookeeper";
}
diff --git a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
similarity index 87%
rename from dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
index a5dc33b..8442c02 100644
--- a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
@@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
-import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
-import org.apache.dolphinscheduler.spi.register.SubscribeListener;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.curator.test.TestingServer;
@@ -50,18 +50,18 @@ public class ZookeeperRegistryTest {
server = new TestingServer(true);
Map<String, String> registryConfig = new HashMap<>();
registryConfig.put(ZookeeperConfiguration.SERVERS.getName(), server.getConnectString());
- registry.init(registryConfig);
- registry.persist("/sub", "");
+ registry.start(registryConfig);
+ registry.put("/sub", "", false);
}
@Test
public void persistTest() {
- registry.persist("/nodes/m1", "");
- registry.persist("/nodes/m2", "");
- Assert.assertEquals(Arrays.asList("m2", "m1"), registry.getChildren("/nodes"));
- Assert.assertTrue(registry.isExisted("/nodes/m1"));
+ registry.put("/nodes/m1", "", false);
+ registry.put("/nodes/m2", "", false);
+ Assert.assertEquals(Arrays.asList("m2", "m1"), registry.children("/nodes"));
+ Assert.assertTrue(registry.exists("/nodes/m1"));
registry.delete("/nodes/m2");
- Assert.assertFalse(registry.isExisted("/nodes/m2"));
+ Assert.assertFalse(registry.exists("/nodes/m2"));
}
@Test
@@ -112,10 +112,9 @@ public class ZookeeperRegistryTest {
}
- class TestListener implements SubscribeListener {
-
+ static class TestListener implements SubscribeListener {
@Override
- public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
+ public void notify(Event event) {
logger.info("I'm test listener");
}
}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
new file mode 100644
index 0000000..4f3db56
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to Apache Software Foundation (ASF) under one or more contributor
+ ~ license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright
+ ~ ownership. Apache Software Foundation (ASF) licenses this file to you under
+ ~ the Apache License, Version 2.0 (the "License"); you may
+ ~ not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>dolphinscheduler-registry</artifactId>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>dolphinscheduler-registry-plugins</artifactId>
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>dolphinscheduler-registry-zookeeper</module>
+ </modules>
+</project>
diff --git a/dolphinscheduler-registry-plugin/pom.xml b/dolphinscheduler-registry/pom.xml
similarity index 74%
rename from dolphinscheduler-registry-plugin/pom.xml
rename to dolphinscheduler-registry/pom.xml
index 2f8bb0d..8ce7c24 100644
--- a/dolphinscheduler-registry-plugin/pom.xml
+++ b/dolphinscheduler-registry/pom.xml
@@ -24,20 +24,25 @@
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-registry-plugin</artifactId>
+ <artifactId>dolphinscheduler-registry</artifactId>
<packaging>pom</packaging>
+ <modules>
+ <module>dolphinscheduler-registry-api</module>
+ <module>dolphinscheduler-registry-plugins</module>
+ </modules>
+
<dependencies>
- <!-- dolphinscheduler -->
<dependency>
- <groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-spi</artifactId>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.22</version>
<scope>provided</scope>
</dependency>
</dependencies>
-
- <modules>
- <module>dolphinscheduler-registry-zookeeper</module>
- </modules>
-</project>
\ No newline at end of file
+</project>
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 7ef9432..6c47f84 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@@ -42,11 +43,10 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
-import org.apache.dolphinscheduler.spi.register.RegistryConnectState;
import org.apache.commons.lang.StringUtils;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executors;
@@ -79,6 +79,7 @@ public class MasterRegistryClient {
@Autowired
private ProcessService processService;
+ @Autowired
private RegistryClient registryClient;
/**
@@ -104,12 +105,11 @@ public class MasterRegistryClient {
public void init() {
this.startupTime = System.currentTimeMillis();
- this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
public void start() {
- String nodeLock = registryClient.getMasterStartUpLockPath();
+ String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/startup-masters
@@ -117,7 +117,7 @@ public class MasterRegistryClient {
// master registry
registry();
String registryPath = getMasterPath();
- registryClient.handleDeadServer(registryPath, NodeType.MASTER, Constants.DELETE_OP);
+ registryClient.handleDeadServer(Collections.singleton(registryPath), NodeType.MASTER, Constants.DELETE_OP);
// init system node
@@ -143,7 +143,8 @@ public class MasterRegistryClient {
}
public void closeRegistry() {
- unRegistry();
+ // TODO unsubscribe MasterRegistryDataListener
+ deregister();
}
/**
@@ -167,7 +168,7 @@ public class MasterRegistryClient {
return;
}
// handle dead server
- registryClient.handleDeadServer(path, nodeType, Constants.ADD_OP);
+ registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
}
//failover server
if (failover) {
@@ -209,9 +210,9 @@ public class MasterRegistryClient {
private String getFailoverLockPath(NodeType nodeType) {
switch (nodeType) {
case MASTER:
- return registryClient.getMasterFailoverLockPath();
+ return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
case WORKER:
- return registryClient.getWorkerFailoverLockPath();
+ return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
default:
return "";
}
@@ -289,20 +290,20 @@ public class MasterRegistryClient {
ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if (workerHost == null
- || !checkOwner
- || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
+ || !checkOwner
+ || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
// only failover the task owned myself if worker down.
if (processInstance == null) {
logger.error("failover error, the process {} of task {} do not exists.",
- taskInstance.getProcessInstanceId(), taskInstance.getId());
+ taskInstance.getProcessInstanceId(), taskInstance.getId());
continue;
}
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
- .buildTaskInstanceRelatedInfo(taskInstance)
- .buildProcessInstanceRelatedInfo(processInstance)
- .create();
+ .buildTaskInstanceRelatedInfo(taskInstance)
+ .buildProcessInstanceRelatedInfo(processInstance)
+ .create();
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
@@ -348,14 +349,6 @@ public class MasterRegistryClient {
logger.info("master failover end");
}
- public void blockAcquireMutex() {
- registryClient.getLock(registryClient.getMasterLockPath());
- }
-
- public void releaseLock() {
- registryClient.releaseLock(registryClient.getMasterLockPath());
- }
-
/**
* registry
*/
@@ -364,36 +357,24 @@ public class MasterRegistryClient {
localNodePath = getMasterPath();
int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
- masterConfig.getMasterMaxCpuloadAvg(),
- masterConfig.getMasterReservedMemory(),
- Sets.newHashSet(getMasterPath()),
- Constants.MASTER_TYPE,
- registryClient);
+ masterConfig.getMasterMaxCpuloadAvg(),
+ masterConfig.getMasterReservedMemory(),
+ Sets.newHashSet(getMasterPath()),
+ Constants.MASTER_TYPE,
+ registryClient);
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
- registryClient.addConnectionStateListener(new MasterRegistryConnectStateListener());
+ registryClient.addConnectionStateListener(newState -> {
+ if (newState == ConnectionState.RECONNECTED || newState == ConnectionState.SUSPENDED) {
+ registryClient.persistEphemeral(localNodePath, "");
+ }
+ });
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
}
- class MasterRegistryConnectStateListener implements RegistryConnectListener {
-
- @Override
- public void notify(RegistryConnectState newState) {
- if (RegistryConnectState.RECONNECTED == newState) {
- registryClient.persistEphemeral(localNodePath, "");
- }
- if (RegistryConnectState.SUSPENDED == newState) {
- registryClient.persistEphemeral(localNodePath, "");
- }
- }
- }
-
- /**
- * remove registry info
- */
- public void unRegistry() {
+ public void deregister() {
try {
String address = getLocalAddress();
String localNodePath = getMasterPath();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
index 4fd50a0..cb5b6be 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
@@ -22,42 +22,43 @@ import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHED
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
-import org.apache.dolphinscheduler.spi.register.SubscribeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Strings;
+
public class MasterRegistryDataListener implements SubscribeListener {
private static final Logger logger = LoggerFactory.getLogger(MasterRegistryDataListener.class);
- private MasterRegistryClient masterRegistryClient;
+ private final MasterRegistryClient masterRegistryClient;
public MasterRegistryDataListener() {
masterRegistryClient = SpringApplicationContext.getBean(MasterRegistryClient.class);
}
@Override
- public void notify(String path, String data, DataChangeEvent event) {
+ public void notify(Event event) {
+ final String path = event.path();
+ if (Strings.isNullOrEmpty(path)) {
+ return;
+ }
//monitor master
if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) {
- handleMasterEvent(event, path);
+ handleMasterEvent(event);
} else if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
//monitor worker
- handleWorkerEvent(event, path);
+ handleWorkerEvent(event);
}
}
- /**
- * monitor master
- *
- * @param event event
- * @param path path
- */
- public void handleMasterEvent(DataChangeEvent event, String path) {
- switch (event) {
+ public void handleMasterEvent(Event event) {
+ final String path = event.path();
+ switch (event.type()) {
case ADD:
logger.info("master node added : {}", path);
break;
@@ -69,14 +70,9 @@ public class MasterRegistryDataListener implements SubscribeListener {
}
}
- /**
- * monitor worker
- *
- * @param event event
- * @param path path
- */
- public void handleWorkerEvent(DataChangeEvent event, String path) {
- switch (event) {
+ public void handleWorkerEvent(Event event) {
+ final String path = event.path();
+ switch (event.type()) {
case ADD:
logger.info("worker node added : {}", path);
break;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 1bceeb7..b7e904b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -27,16 +27,18 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.Event.Type;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.apache.dolphinscheduler.spi.register.DataChangeEvent;
-import org.apache.dolphinscheduler.spi.register.SubscribeListener;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -101,10 +103,8 @@ public class ServerNodeManager implements InitializingBean {
*/
private ScheduledExecutorService executorService;
- /**
- * zk client
- */
- private RegistryClient registryClient = RegistryClient.getInstance();
+ @Autowired
+ private RegistryClient registryClient;
/**
* eg : /node/worker/group/127.0.0.1:xxx
@@ -153,11 +153,11 @@ public class ServerNodeManager implements InitializingBean {
*/
executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS);
- /**
+ /*
* init MasterNodeListener listener
*/
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
- /**
+ /*
* init WorkerNodeListener listener
*/
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener());
@@ -167,15 +167,15 @@ public class ServerNodeManager implements InitializingBean {
* load nodes from zookeeper
*/
public void load() {
- /**
+ /*
* master nodes from zookeeper
*/
updateMasterNodes();
- /**
+ /*
* worker group nodes from zookeeper
*/
- Set<String> workerGroups = registryClient.getWorkerGroupDirectly();
+ Collection<String> workerGroups = registryClient.getWorkerGroupDirectly();
for (String workerGroup : workerGroups) {
syncWorkerGroupNodes(workerGroup, registryClient.getWorkerGroupNodesDirectly(workerGroup));
}
@@ -218,25 +218,28 @@ public class ServerNodeManager implements InitializingBean {
class WorkerDataListener implements SubscribeListener {
@Override
- public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
+ public void notify(Event event) {
+ final String path = event.path();
+ final Type type = event.type();
+ final String data = event.data();
if (registryClient.isWorkerPath(path)) {
try {
- if (dataChangeEvent == DataChangeEvent.ADD) {
+ if (type == Type.ADD) {
logger.info("worker group node : {} added.", path);
String group = parseGroup(path);
- Set<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
+ Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
logger.info("currentNodes : {}", currentNodes);
syncWorkerGroupNodes(group, currentNodes);
- } else if (dataChangeEvent == DataChangeEvent.REMOVE) {
+ } else if (type == Type.REMOVE) {
logger.info("worker group node : {} down.", path);
String group = parseGroup(path);
- Set<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
+ Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes);
alertDao.sendServerStopedAlert(1, path, "WORKER");
- } else if (dataChangeEvent == DataChangeEvent.UPDATE) {
+ } else if (type == Type.UPDATE) {
logger.debug("worker group node : {} update, data: {}", path, data);
String group = parseGroup(path);
- Set<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
+ Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes);
String node = parseNode(path);
@@ -268,19 +271,18 @@ public class ServerNodeManager implements InitializingBean {
}
}
- /**
- * master node listener
- */
class MasterDataListener implements SubscribeListener {
@Override
- public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
+ public void notify(Event event) {
+ final String path = event.path();
+ final Type type = event.type();
if (registryClient.isMasterPath(path)) {
try {
- if (dataChangeEvent.equals(DataChangeEvent.ADD)) {
+ if (type.equals(Type.ADD)) {
logger.info("master node : {} added.", path);
updateMasterNodes();
}
- if (dataChangeEvent.equals(DataChangeEvent.REMOVE)) {
+ if (type.equals(Type.REMOVE)) {
logger.info("master node : {} down.", path);
updateMasterNodes();
alertDao.sendServerStopedAlert(1, path, "MASTER");
@@ -295,10 +297,10 @@ public class ServerNodeManager implements InitializingBean {
private void updateMasterNodes() {
SLOT_LIST.clear();
this.masterNodes.clear();
- String nodeLock = registryClient.getMasterLockPath();
+ String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
try {
registryClient.getLock(nodeLock);
- Set<String> currentNodes = registryClient.getMasterNodesDirectly();
+ Collection<String> currentNodes = registryClient.getMasterNodesDirectly();
List<Server> masterNodes = registryClient.getServerList(NodeType.MASTER);
syncMasterNodes(currentNodes, masterNodes);
} catch (Exception e) {
@@ -328,7 +330,7 @@ public class ServerNodeManager implements InitializingBean {
*
* @param nodes master nodes
*/
- private void syncMasterNodes(Set<String> nodes, List<Server> masterNodes) {
+ private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
masterLock.lock();
try {
this.masterNodes.addAll(nodes);
@@ -353,7 +355,7 @@ public class ServerNodeManager implements InitializingBean {
* @param workerGroup worker group
* @param nodes worker nodes
*/
- private void syncWorkerGroupNodes(String workerGroup, Set<String> nodes) {
+ private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) {
workerGroupLock.lock();
try {
workerGroup = workerGroup.toLowerCase();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java
index 34d6d9d..9e3bb0c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/RegistryMonitorImpl.java
@@ -19,36 +19,26 @@ package org.apache.dolphinscheduler.server.monitor;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-/**
- * zk monitor server impl
- */
@Component
public class RegistryMonitorImpl extends AbstractMonitor {
- /**
- * zookeeper operator
- */
- private RegistryClient registryClient = RegistryClient.getInstance();
-
- /**
- * get active nodes map by path
- *
- * @param path path
- * @return active nodes map
- */
+ @Autowired
+ private RegistryClient registryClient;
+
@Override
protected Map<String, String> getActiveNodesByPath(String path) {
Map<String, String> maps = new HashMap<>();
- List<String> childrenList = registryClient.getChildrenKeys(path);
+ Collection<String> childrenList = registryClient.getChildrenKeys(path);
if (childrenList == null) {
return maps;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index 61e8c40..67fd07a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -33,11 +33,11 @@ public class HeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
- private Set<String> heartBeatPaths;
- private RegistryClient registryClient;
+ private final Set<String> heartBeatPaths;
+ private final RegistryClient registryClient;
private WorkerManagerThread workerManagerThread;
- private String serverType;
- private HeartBeat heartBeat;
+ private final String serverType;
+ private final HeartBeat heartBeat;
public HeartBeatTask(long startupTime,
double maxCpuloadAvg,
@@ -89,7 +89,7 @@ public class HeartBeatTask implements Runnable {
}
for (String heartBeatPath : heartBeatPaths) {
- registryClient.update(heartBeatPath, heartBeat.encodeHeartBeat());
+ registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
}
} catch (Throwable ex) {
logger.error("error write heartbeat info", ex);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index fa186d0..96ec36b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -52,17 +52,11 @@ public class TaskCallbackService {
private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();
/**
- * zookeeper registry center
- */
- private RegistryClient registryClient;
-
- /**
* netty remoting client
*/
private final NettyRemotingClient nettyRemotingClient;
public TaskCallbackService() {
- this.registryClient = RegistryClient.getInstance();
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index b59e3ec..54ec49c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang.StringUtils;
+import java.io.IOException;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
@@ -73,6 +74,7 @@ public class WorkerRegistryClient {
*/
private ScheduledExecutorService heartBeatExecutor;
+ @Autowired
private RegistryClient registryClient;
/**
@@ -86,7 +88,6 @@ public class WorkerRegistryClient {
public void initWorkRegistry() {
this.workerGroups = workerConfig.getWorkerGroups();
this.startupTime = System.currentTimeMillis();
- this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
@@ -121,7 +122,7 @@ public class WorkerRegistryClient {
/**
* remove registry info
*/
- public void unRegistry() {
+ public void unRegistry() throws IOException {
try {
String address = getLocalAddress();
Set<String> workerZkPaths = getWorkerZkPaths();
@@ -161,7 +162,7 @@ public class WorkerRegistryClient {
return workerPaths;
}
- public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) throws Exception {
+ public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) {
registryClient.handleDeadServer(nodeSet, nodeType, opType);
}
diff --git a/dolphinscheduler-server/src/main/resources/config/install_config.conf b/dolphinscheduler-server/src/main/resources/config/install_config.conf
index 2e74b1e..96174d9 100755
--- a/dolphinscheduler-server/src/main/resources/config/install_config.conf
+++ b/dolphinscheduler-server/src/main/resources/config/install_config.conf
@@ -93,11 +93,6 @@ dbname="dolphinscheduler"
# ---------------------------------------------------------
# Registry Server
# ---------------------------------------------------------
-# Registry Server plugin dir. DolphinScheduler will find and load the registry plugin jar package from this dir.
-# For now default registry server is zookeeper, so the default value is `lib/plugin/registry/zookeeper`.
-# If you want to implement your own registry server, please see https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/registry_spi.html
-registryPluginDir="lib/plugin/registry/zookeeper"
-
# Registry Server plugin name, should be a substring of `registryPluginDir`, DolphinScheduler use this for verifying configuration consistency
registryPluginName="zookeeper"
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
index 80f75af..619dba8 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
@@ -56,7 +56,7 @@ public class ExecutorDispatcherTest {
}
@Test
- public void testDispatch() throws ExecuteException {
+ public void testDispatch() throws Exception {
int port = 30000;
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(port);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index 073df65..97c45f1 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -20,10 +20,6 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.concurrent.ScheduledExecutorService;
-
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
@@ -33,13 +29,17 @@ import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecC
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -59,6 +59,7 @@ public class MasterRegistryClientTest {
@Mock
private MasterConfig masterConfig;
+ @Mock
private RegistryClient registryClient;
@Mock
@@ -72,13 +73,10 @@ public class MasterRegistryClientTest {
@Before
public void before() throws Exception {
- PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
- registryClient = PowerMockito.mock(RegistryClient.class);
given(registryClient.getLock(Mockito.anyString())).willReturn(true);
- given(registryClient.getMasterFailoverLockPath()).willReturn("/path");
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080");
- doNothing().when(registryClient).handleDeadServer(Mockito.anyString(), Mockito.any(NodeType.class), Mockito.anyString());
+ doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString());
ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
ProcessInstance processInstance = new ProcessInstance();
diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml
index bc8b367..753dc31 100644
--- a/dolphinscheduler-service/pom.xml
+++ b/dolphinscheduler-service/pom.xml
@@ -42,6 +42,10 @@
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
+ </dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java
deleted file mode 100644
index ba74f88..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.registry;
-
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-
-import org.apache.dolphinscheduler.common.IStoppable;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader;
-import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig;
-import org.apache.dolphinscheduler.spi.register.Registry;
-import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
-import org.apache.dolphinscheduler.spi.register.RegistryException;
-import org.apache.dolphinscheduler.spi.register.RegistryPluginManager;
-import org.apache.dolphinscheduler.spi.register.SubscribeListener;
-
-import org.apache.commons.lang.StringUtils;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * All business parties use this class to access the registry
- */
-public class RegistryCenter {
-
- private static final Logger logger = LoggerFactory.getLogger(RegistryCenter.class);
-
- private final AtomicBoolean isStarted = new AtomicBoolean(false);
-
- private Registry registry;
-
- private IStoppable stoppable;
-
- /**
- * nodes namespace
- */
- protected static String NODES;
-
- private RegistryPluginManager registryPluginManager;
-
- protected static final String EMPTY = "";
-
- private static final String REGISTRY_PREFIX = "registry";
-
- private static final String REGISTRY_PLUGIN_BINDING = "registry.plugin.binding";
-
- private static final String REGISTRY_PLUGIN_DIR = "registry.plugin.dir";
-
- private static final String MAVEN_LOCAL_REPOSITORY = "maven.local.repository";
-
- private static final String REGISTRY_PLUGIN_NAME = "plugin.name";
-
- /**
- * default registry plugin dir
- */
- private static final String REGISTRY_PLUGIN_PATH = "lib/plugin/registry";
-
- private static final String REGISTRY_CONFIG_FILE_PATH = "/registry.properties";
-
- /**
- * init node persist
- */
- public void init() {
- if (isStarted.compareAndSet(false, true)) {
- PropertyUtils.loadPropertyFile(REGISTRY_CONFIG_FILE_PATH);
- Map<String, String> registryConfig = PropertyUtils.getPropertiesByPrefix(REGISTRY_PREFIX);
-
- if (null == registryConfig || registryConfig.isEmpty()) {
- throw new RegistryException("registry config param is null");
- }
- if (null == registryPluginManager) {
- installRegistryPlugin(registryConfig.get(REGISTRY_PLUGIN_NAME));
- registry = registryPluginManager.getRegistry();
- }
-
- registry.init(registryConfig);
- initNodes();
-
- }
- }
-
- /**
- * init nodes
- */
- private void initNodes() {
- persist(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY);
- persist(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY);
- persist(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY);
- }
-
- /**
- * install registry plugin
- */
- private void installRegistryPlugin(String registryPluginName) {
- DolphinPluginManagerConfig registryPluginManagerConfig = new DolphinPluginManagerConfig();
- registryPluginManagerConfig.setPlugins(PropertyUtils.getString(REGISTRY_PLUGIN_BINDING));
- if (StringUtils.isNotBlank(PropertyUtils.getString(REGISTRY_PLUGIN_DIR))) {
- registryPluginManagerConfig.setInstalledPluginsDir(PropertyUtils.getString(REGISTRY_PLUGIN_DIR, REGISTRY_PLUGIN_PATH).trim());
- }
-
- if (StringUtils.isNotBlank(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY))) {
- registryPluginManagerConfig.setMavenLocalRepository(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY).trim());
- }
-
- registryPluginManager = new RegistryPluginManager(registryPluginName);
-
- DolphinPluginLoader registryPluginLoader = new DolphinPluginLoader(registryPluginManagerConfig, ImmutableList.of(registryPluginManager));
- try {
- registryPluginLoader.loadPlugins();
- } catch (Exception e) {
- throw new RuntimeException("Load registry Plugin Failed !", e);
- }
- }
-
- /**
- * close
- */
- public void close() {
- if (isStarted.compareAndSet(true, false) && registry != null) {
- registry.close();
- }
- }
-
- public void persist(String key, String value) {
- registry.persist(key, value);
- }
-
- public void persistEphemeral(String key, String value) {
- registry.persistEphemeral(key, value);
- }
-
- public void remove(String key) {
- registry.remove(key);
- }
-
- public void update(String key, String value) {
- registry.update(key, value);
- }
-
- public String get(String key) {
- return registry.get(key);
- }
-
- public void subscribe(String path, SubscribeListener subscribeListener) {
- registry.subscribe(path, subscribeListener);
- }
-
- public void addConnectionStateListener(RegistryConnectListener registryConnectListener) {
- registry.addConnectionStateListener(registryConnectListener);
- }
-
- public boolean isExisted(String key) {
- return registry.isExisted(key);
- }
-
- public boolean getLock(String key) {
- return registry.acquireLock(key);
- }
-
- public boolean releaseLock(String key) {
- return registry.releaseLock(key);
- }
-
- /**
- * @return get dead server node parent path
- */
- public String getDeadZNodeParentPath() {
- return REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
- }
-
- public void setStoppable(IStoppable stoppable) {
- this.stoppable = stoppable;
- }
-
- public IStoppable getStoppable() {
- return stoppable;
- }
-
- /**
- * whether master path
- *
- * @param path path
- * @return result
- */
- public boolean isMasterPath(String path) {
- return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_MASTERS);
- }
-
- /**
- * get worker group path
- *
- * @param workerGroup workerGroup
- * @return worker group path
- */
- public String getWorkerGroupPath(String workerGroup) {
- return REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup;
- }
-
- /**
- * whether worker path
- *
- * @param path path
- * @return result
- */
- public boolean isWorkerPath(String path) {
- return path != null && path.contains(REGISTRY_DOLPHINSCHEDULER_WORKERS);
- }
-
- /**
- * get children nodes
- *
- * @param key key
- * @return children nodes
- */
- public List<String> getChildrenKeys(final String key) {
- return registry.getChildren(key);
- }
-
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index 210ce21..f384678 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -22,60 +22,72 @@ import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
+import static com.google.common.base.Preconditions.checkArgument;
+
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-
-import org.apache.commons.lang.StringUtils;
-
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.Registry;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.RegistryFactory;
+import org.apache.dolphinscheduler.registry.api.RegistryFactoryLoader;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+
+import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
-/**
- * registry client singleton
- */
-public class RegistryClient extends RegistryCenter {
+import com.google.common.base.Strings;
+@Component
+public class RegistryClient {
private static final Logger logger = LoggerFactory.getLogger(RegistryClient.class);
- private static RegistryClient registryClient = new RegistryClient();
-
- private RegistryClient() {
- super.init();
- }
-
- public static RegistryClient getInstance() {
- return registryClient;
+ private static final String EMPTY = "";
+ private static final String REGISTRY_PREFIX = "registry";
+ private static final String REGISTRY_PLUGIN_NAME = "plugin.name";
+ private static final String REGISTRY_CONFIG_FILE_PATH = "/registry.properties";
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+ private Registry registry;
+ private IStoppable stoppable;
+
+ @PostConstruct
+ public void afterConstruct() {
+ start();
+ initNodes();
}
- /**
- * get active master num
- *
- * @return active master number
- */
public int getActiveMasterNum() {
- List<String> childrenList = new ArrayList<>();
+ Collection<String> childrenList = new ArrayList<>();
try {
// read master node parent path from conf
- if (isExisted(getNodeParentPath(NodeType.MASTER))) {
- childrenList = getChildrenKeys(getNodeParentPath(NodeType.MASTER));
+ if (exists(rootNodePath(NodeType.MASTER))) {
+ childrenList = getChildrenKeys(rootNodePath(NodeType.MASTER));
}
} catch (Exception e) {
logger.error("getActiveMasterNum error", e);
@@ -83,15 +95,9 @@ public class RegistryClient extends RegistryCenter {
return childrenList.size();
}
- /**
- * get server list.
- *
- * @param nodeType zookeeper node type
- * @return server list
- */
public List<Server> getServerList(NodeType nodeType) {
- Map<String, String> serverMaps = getServerMaps(nodeType);
- String parentPath = getNodeParentPath(nodeType);
+ Map<String, String> serverMaps = getServerMaps(nodeType, false);
+ String parentPath = rootNodePath(nodeType);
List<Server> serverList = new ArrayList<>();
for (Map.Entry<String, String> entry : serverMaps.entrySet()) {
@@ -119,40 +125,11 @@ public class RegistryClient extends RegistryCenter {
return serverList;
}
- /**
- * get server nodes.
- *
- * @param nodeType registry node type
- * @return result : list<node>
- */
- public List<String> getServerNodes(NodeType nodeType) {
- String path = getNodeParentPath(nodeType);
- List<String> serverList = getChildrenKeys(path);
- if (nodeType == NodeType.WORKER) {
- List<String> workerList = new ArrayList<>();
- for (String group : serverList) {
- List<String> groupServers = getChildrenKeys(path + SINGLE_SLASH + group);
- for (String groupServer : groupServers) {
- workerList.add(group + SINGLE_SLASH + groupServer);
- }
- }
- serverList = workerList;
- }
- return serverList;
- }
-
- /**
- * get server list map.
- *
- * @param nodeType zookeeper node type
- * @param hostOnly host only
- * @return result : {host : resource info}
- */
public Map<String, String> getServerMaps(NodeType nodeType, boolean hostOnly) {
Map<String, String> serverMap = new HashMap<>();
try {
- String path = getNodeParentPath(nodeType);
- List<String> serverList = getServerNodes(nodeType);
+ String path = rootNodePath(nodeType);
+ Collection<String> serverList = getServerNodes(nodeType);
for (String server : serverList) {
String host = server;
if (nodeType == NodeType.WORKER && hostOnly) {
@@ -167,298 +144,194 @@ public class RegistryClient extends RegistryCenter {
return serverMap;
}
- /**
- * get server list map.
- *
- * @param nodeType zookeeper node type
- * @return result : {host : resource info}
- */
- public Map<String, String> getServerMaps(NodeType nodeType) {
- return getServerMaps(nodeType, false);
+ public boolean checkNodeExists(String host, NodeType nodeType) {
+ return getServerMaps(nodeType, true).keySet()
+ .stream()
+ .anyMatch(it -> it.contains(host));
}
- /**
- * get server node set.
- *
- * @param nodeType zookeeper node type
- * @param hostOnly host only
- * @return result : set<host>
- */
- public Set<String> getServerNodeSet(NodeType nodeType, boolean hostOnly) {
- Set<String> serverSet = new HashSet<>();
- try {
- List<String> serverList = getServerNodes(nodeType);
- for (String server : serverList) {
- String host = server;
- if (nodeType == NodeType.WORKER && hostOnly) {
- host = server.split(SINGLE_SLASH)[1];
- }
- serverSet.add(host);
+ public void handleDeadServer(Collection<String> nodes, NodeType nodeType, String opType) {
+ nodes.forEach(node -> {
+ final String host = getHostByEventDataPath(node);
+ final String type = nodeType == NodeType.MASTER ? MASTER_TYPE : WORKER_TYPE;
+
+ if (opType.equals(DELETE_OP)) {
+ removeDeadServerByHost(host, type);
+ } else if (opType.equals(ADD_OP)) {
+ String deadServerPath = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + type + UNDERLINE + host;
+ // Add dead server info to zk dead server path : /dead-servers/
+ registry.put(deadServerPath, type + UNDERLINE + host, false);
+ logger.info("{} server dead , and {} added to zk dead server path success", nodeType, node);
}
- } catch (Exception e) {
- logger.error("get server node set failed", e);
- }
- return serverSet;
+ });
}
- /**
- * get server node list.
- *
- * @param nodeType zookeeper node type
- * @param hostOnly host only
- * @return result : list<host>
- */
- public List<String> getServerNodeList(NodeType nodeType, boolean hostOnly) {
- Set<String> serverSet = getServerNodeSet(nodeType, hostOnly);
- List<String> serverList = new ArrayList<>(serverSet);
- Collections.sort(serverList);
- return serverList;
- }
+ public boolean checkIsDeadServer(String node, String serverType) {
+ // ip_sequence_no
+ String[] zNodesPath = node.split("/");
+ String ipSeqNo = zNodesPath[zNodesPath.length - 1];
+ String deadServerPath = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverType + UNDERLINE + ipSeqNo;
- /**
- * check the zookeeper node already exists
- *
- * @param host host
- * @param nodeType zookeeper node type
- * @return true if exists
- */
- public boolean checkNodeExists(String host, NodeType nodeType) {
- String path = getNodeParentPath(nodeType);
- if (StringUtils.isEmpty(path)) {
- logger.error("check zk node exists error, host:{}, zk node type:{}",
- host, nodeType);
- return false;
- }
- Map<String, String> serverMaps = getServerMaps(nodeType, true);
- for (String hostKey : serverMaps.keySet()) {
- if (hostKey.contains(host)) {
- return true;
- }
- }
- return false;
+ return !exists(node) || exists(deadServerPath);
}
- /**
- * @return get worker node parent path
- */
- protected String getWorkerNodeParentPath() {
- return Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+ public Collection<String> getMasterNodesDirectly() {
+ return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
}
- /**
- * @return get master node parent path
- */
- protected String getMasterNodeParentPath() {
- return Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+ public Collection<String> getWorkerGroupDirectly() {
+ return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
}
- /**
- * @return get dead server node parent path
- */
- protected String getDeadNodeParentPath() {
- return Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
+ public Collection<String> getWorkerGroupNodesDirectly(String workerGroup) {
+ return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup);
}
/**
- * @return get master lock path
+ * get host ip:port, path format: parentPath/ip:port
+ *
+ * @param path path
+ * @return host ip:port, string format: parentPath/ip:port
*/
- public String getMasterLockPath() {
- return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
+ public String getHostByEventDataPath(String path) {
+ checkArgument(!Strings.isNullOrEmpty(path), "path cannot be null or empty");
+
+ final String[] pathArray = path.split(SINGLE_SLASH);
+
+ checkArgument(pathArray.length >= 1, "cannot parse path: %s", path);
+
+ return pathArray[pathArray.length - 1];
}
- /**
- * @param nodeType zookeeper node type
- * @return get zookeeper node parent path
- */
- public String getNodeParentPath(NodeType nodeType) {
- String path = "";
- switch (nodeType) {
- case MASTER:
- return getMasterNodeParentPath();
- case WORKER:
- return getWorkerNodeParentPath();
- case DEAD_SERVER:
- return getDeadNodeParentPath();
- default:
- break;
+ public void close() throws IOException {
+ if (isStarted.compareAndSet(true, false) && registry != null) {
+ registry.close();
}
- return path;
}
- /**
- * @return get master start up lock path
- */
- public String getMasterStartUpLockPath() {
- return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
+ public void persistEphemeral(String key, String value) {
+ registry.put(key, value, true);
}
- /**
- * @return get master failover lock path
- */
- public String getMasterFailoverLockPath() {
- return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
+ public void remove(String key) {
+ registry.delete(key);
}
- /**
- * @return get worker failover lock path
- */
- public String getWorkerFailoverLockPath() {
- return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
+ public String get(String key) {
+ return registry.get(key);
}
- /**
- * opType(add): if find dead server , then add to zk deadServerPath
- * opType(delete): delete path from zk
- *
- * @param node node path
- * @param nodeType master or worker
- * @param opType delete or add
- * @throws Exception errors
- */
- public void handleDeadServer(String node, NodeType nodeType, String opType) throws Exception {
- String host = getHostByEventDataPath(node);
- String type = (nodeType == NodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE;
-
- //check server restart, if restart , dead server path in zk should be delete
- if (opType.equals(DELETE_OP)) {
- removeDeadServerByHost(host, type);
-
- } else if (opType.equals(ADD_OP)) {
- String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
- if (!isExisted(deadServerPath)) {
- //add dead server info to zk dead server path : /dead-servers/
+ public void subscribe(String path, SubscribeListener listener) {
+ registry.subscribe(path, listener);
+ }
- persist(deadServerPath, (type + UNDERLINE + host));
+ public void addConnectionStateListener(ConnectionListener listener) {
+ registry.addConnectionStateListener(listener);
+ }
- logger.info("{} server dead , and {} added to zk dead server path success",
- nodeType, node);
- }
- }
+ public boolean exists(String key) {
+ return registry.exists(key);
+ }
+ public boolean getLock(String key) {
+ return registry.acquireLock(key);
}
- /**
- * check dead server or not , if dead, stop self
- *
- * @param node node path
- * @param serverType master or worker prefix
- * @return true if not exists
- * @throws Exception errors
- */
- public boolean checkIsDeadServer(String node, String serverType) throws Exception {
- // ip_sequence_no
- String[] zNodesPath = node.split("\\/");
- String ipSeqNo = zNodesPath[zNodesPath.length - 1];
- String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + serverType + UNDERLINE + ipSeqNo;
+ public boolean releaseLock(String key) {
+ return registry.releaseLock(key);
+ }
- return !isExisted(node) || isExisted(deadServerPath);
+ public void setStoppable(IStoppable stoppable) {
+ this.stoppable = stoppable;
}
- /**
- * get master nodes directly
- *
- * @return master nodes
- */
- public Set<String> getMasterNodesDirectly() {
- List<String> masters = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
- return new HashSet<>(masters);
+ public IStoppable getStoppable() {
+ return stoppable;
}
- /**
- * get worker nodes directly
- *
- * @return master nodes
- */
- public Set<String> getWorkerNodesDirectly() {
- List<String> workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
- return new HashSet<>(workers);
+ public boolean isMasterPath(String path) {
+ return path != null && path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS);
}
- /**
- * get worker group directly
- *
- * @return worker group nodes
- */
- public Set<String> getWorkerGroupDirectly() {
- List<String> workers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
- return new HashSet<>(workers);
+ public boolean isWorkerPath(String path) {
+ return path != null && path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS);
}
- /**
- * get worker group nodes
- */
- public Set<String> getWorkerGroupNodesDirectly(String workerGroup) {
- List<String> workers = getChildrenKeys(getWorkerGroupPath(workerGroup));
- return new HashSet<>(workers);
+ public Collection<String> getChildrenKeys(final String key) {
+ return registry.children(key);
}
- /**
- * opType(add): if find dead server , then add to zk deadServerPath
- * opType(delete): delete path from zk
- *
- * @param nodeSet node path set
- * @param nodeType master or worker
- * @param opType delete or add
- * @throws Exception errors
- */
- public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) throws Exception {
+ public Set<String> getServerNodeSet(NodeType nodeType, boolean hostOnly) {
+ try {
+ return getServerNodes(nodeType).stream().map(server -> {
+ if (nodeType == NodeType.WORKER && hostOnly) {
+ return server.split(SINGLE_SLASH)[1];
+ }
+ return server;
+ }).collect(Collectors.toSet());
+ } catch (Exception e) {
+ throw new RegistryException("Failed to get server node: " + nodeType, e);
+ }
+ }
- String type = (nodeType == NodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE;
- for (String node : nodeSet) {
- String host = getHostByEventDataPath(node);
- //check server restart, if restart , dead server path in zk should be delete
- if (opType.equals(DELETE_OP)) {
- removeDeadServerByHost(host, type);
+ private void start() {
+ if (isStarted.compareAndSet(false, true)) {
+ PropertyUtils.loadPropertyFile(REGISTRY_CONFIG_FILE_PATH);
+ final Map<String, String> registryConfig = PropertyUtils.getPropertiesByPrefix(REGISTRY_PREFIX);
- } else if (opType.equals(ADD_OP)) {
- String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
- if (!isExisted(deadServerPath)) {
- //add dead server info to zk dead server path : /dead-servers/
- persist(deadServerPath, (type + UNDERLINE + host));
- logger.info("{} server dead , and {} added to registry dead server path success",
- nodeType, node);
- }
+ if (null == registryConfig || registryConfig.isEmpty()) {
+ throw new RegistryException("registry config param is null");
}
-
+ final String pluginName = registryConfig.get(REGISTRY_PLUGIN_NAME);
+ final Map<String, RegistryFactory> factories = RegistryFactoryLoader.load();
+ if (!factories.containsKey(pluginName)) {
+ throw new RegistryException("No such registry plugin: " + pluginName);
+ }
+ registry = factories.get(pluginName).create();
+ registry.start(registryConfig);
}
+ }
+ private void initNodes() {
+ registry.put(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY, false);
+ registry.put(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY, false);
+ registry.put(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY, false);
}
- /**
- * get host ip:port, string format: parentPath/ip:port
- *
- * @param path path
- * @return host ip:port, string format: parentPath/ip:port
- */
- public String getHostByEventDataPath(String path) {
- if (StringUtils.isEmpty(path)) {
- logger.error("empty path!");
- return "";
- }
- String[] pathArray = path.split(SINGLE_SLASH);
- if (pathArray.length < 1) {
- logger.error("parse ip error: {}", path);
- return "";
+ private String rootNodePath(NodeType type) {
+ switch (type) {
+ case MASTER:
+ return Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+ case WORKER:
+ return Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+ case DEAD_SERVER:
+ return Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
+ default:
+ throw new IllegalStateException("Should not reach here");
}
- return pathArray[pathArray.length - 1];
+ }
+ private Collection<String> getServerNodes(NodeType nodeType) {
+ final String path = rootNodePath(nodeType);
+ final Collection<String> serverList = getChildrenKeys(path);
+ if (nodeType != NodeType.WORKER) {
+ return serverList;
+ }
+ return serverList.stream().flatMap(group ->
+ getChildrenKeys(path + SINGLE_SLASH + group)
+ .stream()
+ .map(it -> group + SINGLE_SLASH + it)
+ ).collect(Collectors.toList());
}
- /**
- * remove dead server by host
- *
- * @param host host
- * @param serverType serverType
- */
- public void removeDeadServerByHost(String host, String serverType) {
- List<String> deadServers = getChildrenKeys(getDeadZNodeParentPath());
+ private void removeDeadServerByHost(String host, String serverType) {
+ Collection<String> deadServers = getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS);
for (String serverPath : deadServers) {
if (serverPath.startsWith(serverType + UNDERLINE + host)) {
- String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
+ String server = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + SINGLE_SLASH + serverPath;
remove(server);
logger.info("{} server {} deleted from zk dead server path success", serverType, host);
}
}
}
-
}
diff --git a/dolphinscheduler-service/src/main/resources/registry.properties b/dolphinscheduler-service/src/main/resources/registry.properties
index b00dfc0..f534a65 100644
--- a/dolphinscheduler-service/src/main/resources/registry.properties
+++ b/dolphinscheduler-service/src/main/resources/registry.properties
@@ -15,18 +15,5 @@
# limitations under the License.
#
-#registry.plugin.dir config the Registry Plugin dir.
-registry.plugin.dir=lib/plugin/registry
-
registry.plugin.name=zookeeper
registry.servers=127.0.0.1:2181
-
-#maven.local.repository=/usr/local/localRepository
-
-#registry.plugin.binding config the Registry Plugin need be load when development and run in IDE
-#registry.plugin.binding=./dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml
-
-#registry timeout
-#registry.session.timeout.ms=30000
-#registry.connection.timeout.ms=7500
-#registry.block.until.connected.wait=600
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryClientTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryClientTest.java
deleted file mode 100644
index 2dc2193..0000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryClientTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.registry;
-
-import static org.apache.dolphinscheduler.common.Constants.ADD_OP;
-import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
-
-import static org.mockito.BDDMockito.given;
-
-import org.apache.dolphinscheduler.common.enums.NodeType;
-import org.apache.dolphinscheduler.spi.register.Registry;
-
-import java.util.Arrays;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import com.google.common.collect.Sets;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ RegistryClient.class })
-public class RegistryClientTest {
-
- private RegistryClient registryClient;
-
- @Test
- public void test() throws Exception {
- Registry registry = PowerMockito.mock(Registry.class);
- PowerMockito.doNothing().when(registry).persist(Mockito.anyString(), Mockito.anyString());
- PowerMockito.doNothing().when(registry).update(Mockito.anyString(), Mockito.anyString());
- PowerMockito.when(registry.releaseLock(Mockito.anyString())).thenReturn(true);
- PowerMockito.when(registry.getChildren("/dead-servers")).thenReturn(Arrays.asList("worker_127.0.0.1:8089"));
-
- PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));
- registryClient = PowerMockito.mock(RegistryClient.class);
- registryClient.persist("/key", "");
- registryClient.update("/key", "");
- registryClient.releaseLock("/key");
- registryClient.getChildrenKeys("/key");
- registryClient.handleDeadServer(Sets.newHashSet("ma/127.0.0.1:8089"), NodeType.WORKER, DELETE_OP);
- registryClient.handleDeadServer(Sets.newHashSet("ma/127.0.0.1:8089"), NodeType.WORKER, ADD_OP);
- //registryClient.removeDeadServerByHost("127.0.0.1:8089","master");
- registryClient.handleDeadServer("ma/127.0.0.1:8089", NodeType.WORKER, DELETE_OP);
- registryClient.handleDeadServer("ma/127.0.0.1:8089", NodeType.WORKER, ADD_OP);
- registryClient.checkIsDeadServer("master/127.0.0.1","master");
- given(registry.getChildren("/nodes/worker")).willReturn(Arrays.asList("worker_127.0.0.1:8089"));
- given(registry.getChildren("/nodes/worker/worker_127.0.0.1:8089")).willReturn(Arrays.asList("default"));
-
- registryClient.checkNodeExists("127.0.0.1",NodeType.WORKER);
-
- registryClient.getServerList(NodeType.MASTER);
-
- }
-
-}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryPluginTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryPluginTest.java
deleted file mode 100644
index a35252c..0000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/registry/RegistryPluginTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.registry;
-
-import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader;
-import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig;
-import org.apache.dolphinscheduler.spi.register.RegistryPluginManager;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-
-public class RegistryPluginTest {
-
- @Test
- public void testLoadPlugin() throws Exception {
- DolphinPluginManagerConfig registryPluginManagerConfig = new DolphinPluginManagerConfig();
- String path = DolphinPluginLoader.class.getClassLoader().getResource("").getPath();
-
- String registryPluginZkPath = path + "../../../dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml";
- registryPluginManagerConfig.setPlugins(registryPluginZkPath);
- RegistryPluginManager registryPluginManager = new RegistryPluginManager("zookeeper");
-
- DolphinPluginLoader registryPluginLoader = new DolphinPluginLoader(registryPluginManagerConfig, ImmutableList.of(registryPluginManager));
- registryPluginLoader.loadPlugins();
- Assert.assertNotNull(registryPluginManager.getRegistry());
-
- }
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/DolphinSchedulerPlugin.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/DolphinSchedulerPlugin.java
index 1c5f1c5..a073c53 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/DolphinSchedulerPlugin.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/DolphinSchedulerPlugin.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.spi;
import static java.util.Collections.emptyList;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
-import org.apache.dolphinscheduler.spi.register.RegistryFactory;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
/**
@@ -43,14 +42,6 @@ public interface DolphinSchedulerPlugin {
}
/**
- * get registry plugin factory
- * @return registry factory
- */
- default Iterable<RegistryFactory> getRegisterFactorys() {
- return emptyList();
- }
-
- /**
* get task plugin factory
* @return registry factory
*/
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ConnectStateListener.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ConnectStateListener.java
deleted file mode 100644
index 6675ef6..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ConnectStateListener.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-public interface ConnectStateListener {
-
- void notify(RegistryConnectState state);
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/DataChangeEvent.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/DataChangeEvent.java
deleted file mode 100644
index a6aa32d..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/DataChangeEvent.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-/**
- * Monitor the type of data changes
- */
-public enum DataChangeEvent {
-
- ADD("ADD", 1),
- REMOVE("REMOVE", 2),
- UPDATE("UPDATE",3);
-
- private String type;
-
- private int value;
-
- DataChangeEvent(String type, int value) {
- this.type = type;
- this.value = value;
- }
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java
deleted file mode 100644
index 94b13e6..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-import java.util.HashMap;
-
-/**
- * The registry node monitors subscriptions
- */
-public class ListenerManager {
-
- /**
- * All message subscriptions must be subscribed uniformly at startup.
- * A node path only supports one listener
- */
- private static HashMap<String, SubscribeListener> listeners = new HashMap<>();
-
- /**
- * Check whether the key has been monitored
- */
- public static boolean checkHasListeners(String path) {
- return null != listeners.get(path);
- }
-
- /**
- * add listener(A node can only be monitored by one listener)
- */
- public static void addListener(String path, SubscribeListener listener) {
- listeners.put(path, listener);
- }
-
- /**
- * remove listener
- */
- public static void removeListener(String path) {
- listeners.remove(path);
- }
-
- /**
- *
- *After the data changes, it is distributed to the corresponding listener for processing
- */
- public static void dataChange(String key,String path, String data, DataChangeEvent dataChangeEvent) {
- SubscribeListener notifyListener = listeners.get(key);
- if (null == notifyListener) {
- return;
- }
- notifyListener.notify(path, data, dataChangeEvent);
- }
-
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/Registry.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/Registry.java
deleted file mode 100644
index 11fe25a..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/Registry.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package org.apache.dolphinscheduler.spi.register;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * The final display of all registry component data must follow a tree structure.
- * Therefore, some registry may need to do a layer of internal conversion, such as Etcd
- */
-public interface Registry {
-
- /**
- * initialize registry center.
- */
- void init(Map<String, String> registerData);
-
- /**
- * close registry
- */
- void close();
-
- /**
- * subscribe registry data change, a path can only be monitored by one listener
- */
- boolean subscribe(String path, SubscribeListener subscribeListener);
-
- /**
- * unsubscribe
- */
- void unsubscribe(String path);
-
- /**
- * Registry status monitoring, globally unique. Only one is allowed to subscribe.
- */
- void addConnectionStateListener(RegistryConnectListener registryConnectListener);
-
- /**
- * get key
- */
- String get(String key);
-
- /**
- * delete
- */
- void remove(String key);
-
- /**
- * persist data
- */
- void persist(String key, String value);
-
- /**
- *persist ephemeral data
- */
- void persistEphemeral(String key, String value);
-
- /**
- * update data
- */
- void update(String key, String value);
-
- /**
- * get children keys
- */
- List<String> getChildren(String path);
-
- /**
- * Judge node is exist or not.
- */
- boolean isExisted(String key);
-
- /**
- * delete kay
- */
- boolean delete(String key);
-
- /**
- * Obtain a distributed lock
- * todo It is best to add expiration time, and automatically release the lock after expiration
- */
- boolean acquireLock(String key);
-
- /**
- * release key
- */
- boolean releaseLock(String key);
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectListener.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectListener.java
deleted file mode 100644
index 83385f8..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectListener.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-public interface RegistryConnectListener {
-
- void notify(RegistryConnectState newState);
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectState.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectState.java
deleted file mode 100644
index e085e6d..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryConnectState.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-/**
- * All registry connection status must be converted to this
- */
-public enum RegistryConnectState {
- CONNECTED("connected", 1),
- RECONNECTED("reconnected", 2),
- SUSPENDED("suspended", 3),
- LOST("lost", 4);
-
- private String description;
-
- private int state;
-
- RegistryConnectState(String description, int state) {
- this.description = description;
- this.state = state;
- }
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryException.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryException.java
deleted file mode 100644
index 884f005..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-/**
- * registry exception
- */
-public class RegistryException extends RuntimeException {
-
- public RegistryException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public RegistryException(String message) {
- super(message);
- }
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryFactory.java
deleted file mode 100644
index 244c0f4..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-/**
- * Registry the component factory, all registry must implement this interface
- */
-public interface RegistryFactory {
-
- /**
- * get registry component name
- */
- String getName();
-
- /**
- * get registry
- */
- Registry create();
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryPluginManager.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryPluginManager.java
deleted file mode 100644
index 211795f..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/RegistryPluginManager.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
-import org.apache.dolphinscheduler.spi.classloader.ThreadContextClassLoader;
-import org.apache.dolphinscheduler.spi.plugin.AbstractDolphinPluginManager;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The plug-in address of the registry needs to be configured.
- * Multi-registries are not supported.
- * When the plug-in directory contains multiple plug-ins, only the configured plug-in will be used.
- * todo It’s not good to put it here, consider creating a separate API module for each plugin
- */
-public class RegistryPluginManager extends AbstractDolphinPluginManager {
-
- private static final Logger logger = LoggerFactory.getLogger(RegistryPluginManager.class);
-
- private RegistryFactory registryFactory;
-
- public static Registry registry;
-
- private String registerPluginName;
-
- public RegistryPluginManager(String registerPluginName) {
- this.registerPluginName = registerPluginName;
- }
-
- @Override
- public void installPlugin(DolphinSchedulerPlugin dolphinSchedulerPlugin) {
- for (RegistryFactory registryFactory : dolphinSchedulerPlugin.getRegisterFactorys()) {
- logger.info("Registering Registry Plugin '{}'", registryFactory.getName());
- if (registerPluginName.equals(registryFactory.getName())) {
- this.registryFactory = registryFactory;
- loadRegistry();
- return;
- }
- }
- if (null == registry) {
- throw new RegistryException(String.format("not found %s registry plugin ", registerPluginName));
- }
- }
-
- /**
- * load registry
- */
- private void loadRegistry() {
- try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(registryFactory.getClass().getClassLoader())) {
- registry = registryFactory.create();
- }
- }
-
- /**
- * get registry
- * @return registry
- */
- public Registry getRegistry() {
- if (null == registry) {
- throw new RegistryException("not install registry");
- }
- return registry;
- }
-
-}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java
deleted file mode 100644
index 3db7f2e..0000000
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.spi.register;
-
-/**
- * Registration center subscription. All listeners must implement this interface
- */
-public interface SubscribeListener {
-
- /**
- * Processing logic when the subscription node changes
- */
- void notify(String path, String data, DataChangeEvent dataChangeEvent);
-
-}
diff --git a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
index 4a41770..27d9a55 100644
--- a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
+++ b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
@@ -86,15 +86,6 @@ public class StandaloneServer {
private static void startRegistry() throws Exception {
final TestingServer server = new TestingServer(true);
System.setProperty("registry.servers", server.getConnectString());
-
- final Path registryPath = Paths.get(
- StandaloneServer.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
- "../../../dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml"
- ).toAbsolutePath();
- if (Files.exists(registryPath)) {
- System.setProperty("registry.plugin.binding", registryPath.toString());
- System.setProperty("registry.plugin.dir", "");
- }
}
private static void startDatabase() throws IOException, SQLException {
diff --git a/pom.xml b/pom.xml
index 0c42a7a..a475181 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,7 @@
<byte-buddy.version>1.9.16</byte-buddy.version>
<java-websocket.version>1.5.1</java-websocket.version>
<py4j.version>0.10.9</py4j.version>
+ <auto-service.version>1.0.1</auto-service.version>
</properties>
<dependencyManagement>
@@ -231,7 +232,12 @@
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-registry-plugin</artifactId>
+ <artifactId>dolphinscheduler-registry</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-registry-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -271,6 +277,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-registry-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
@@ -690,6 +702,13 @@
<artifactId>py4j</artifactId>
<version>${py4j.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <version>${auto-service.version}</version>
+ <optional>true</optional>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -1234,7 +1253,7 @@
<modules>
<module>dolphinscheduler-spi</module>
<module>dolphinscheduler-alert-plugin</module>
- <module>dolphinscheduler-registry-plugin</module>
+ <module>dolphinscheduler-registry</module>
<module>dolphinscheduler-task-plugin</module>
<module>dolphinscheduler-ui</module>
<module>dolphinscheduler-server</module>
diff --git a/script/env/dolphinscheduler_env.sh b/script/env/dolphinscheduler_env.sh
index e62f54e..1798a5d 100755
--- a/script/env/dolphinscheduler_env.sh
+++ b/script/env/dolphinscheduler_env.sh
@@ -20,7 +20,7 @@ export HADOOP_CONF_DIR=/opt/soft/hadoop/etc/hadoop
export SPARK_HOME1=/opt/soft/spark1
export SPARK_HOME2=/opt/soft/spark2
export PYTHON_HOME=/opt/soft/python
-export JAVA_HOME=/opt/soft/java
+export JAVA_HOME=${JAVA_HOME:-/opt/soft/java}
export HIVE_HOME=/opt/soft/hive
export FLINK_HOME=/opt/soft/flink
export DATAX_HOME=/opt/soft/datax