You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2019/12/30 09:59:15 UTC
[incubator-dolphinscheduler] branch dev updated: refactor
AbstractZKClient (#1627)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5e935d5 refactor AbstractZKClient (#1627)
5e935d5 is described below
commit 5e935d579eec25701a6ce1d29fea95b161173ba3
Author: Tboy <gu...@immomo.com>
AuthorDate: Mon Dec 30 17:59:06 2019 +0800
refactor AbstractZKClient (#1627)
* we should insert alert DB once , and trigger this type of alert 3 times
* refactor AbstractZKClient
---
.../common/zk/AbstractZKClient.java | 96 ++++++++--------------
1 file changed, 36 insertions(+), 60 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
index 0e95ddd..c3ba718 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
@@ -16,6 +16,22 @@
*/
package org.apache.dolphinscheduler.common.zk;
+import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
@@ -23,26 +39,9 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ResInfo;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-
/**
* abstract zookeeper client
@@ -70,8 +69,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
return;
}
- byte[] bytes = zkClient.getData().forPath(znode);
- String resInfoStr = new String(bytes);
+ String resInfoStr = super.get(znode);
String[] splits = resInfoStr.split(Constants.COMMA);
if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
return;
@@ -107,8 +105,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX;
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
- if(zkClient.checkExists().forPath(zNode) == null ||
- zkClient.checkExists().forPath(deadServerPath) != null ){
+ if(!isExisted(zNode) || isExisted(deadServerPath)){
return true;
}
@@ -118,14 +115,12 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
public void removeDeadServerByHost(String host, String serverType) throws Exception {
- List<String> deadServers = zkClient.getChildren().forPath(getDeadZNodeParentPath());
+ List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
for(String serverPath : deadServers){
if(serverPath.startsWith(serverType+UNDERLINE+host)){
- String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
- if(zkClient.checkExists().forPath(server) != null){
- zkClient.delete().forPath(server);
- logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
- }
+ String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
+ super.remove(server);
+ logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
}
}
}
@@ -143,8 +138,8 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
// create temporary sequence nodes for master znode
String parentPath = getZNodeParentPath(zkNodeType);
String serverPathPrefix = parentPath + "/" + OSUtils.getHost();
- String registerPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
- serverPathPrefix + UNDERLINE, heartbeatZKInfo.getBytes());
+ String registerPath = serverPathPrefix + UNDERLINE;
+ super.persistEphemeral(registerPath, heartbeatZKInfo);
logger.info("register {} node {} success" , zkNodeType.toString(), registerPath);
return registerPath;
}
@@ -165,7 +160,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
}
registerPath = createZNodePath(zkNodeType);
- // handle dead server
+ // handle dead server
handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);
return registerPath;
@@ -196,10 +191,10 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
}else if(opType.equals(ADD_ZK_OP)){
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
- if(zkClient.checkExists().forPath(deadServerPath) == null){
+ if(!super.isExisted(deadServerPath)){
//add dead server info to zk dead server path : /dead-servers/
- zkClient.create().forPath(deadServerPath,(type + UNDERLINE + ipSeqNo).getBytes());
+ super.persist(deadServerPath,(type + UNDERLINE + ipSeqNo));
logger.info("{} server dead , and {} added to zk dead server path success" ,
zkNodeType.toString(), zNode);
@@ -226,19 +221,13 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
List<String> childrenList = new ArrayList<>();
try {
// read master node parent path from conf
- if(zkClient.checkExists().forPath(getZNodeParentPath(ZKNodeType.MASTER)) != null){
- childrenList = zkClient.getChildren().forPath(getZNodeParentPath(ZKNodeType.MASTER));
+ if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){
+ childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
}
} catch (Exception e) {
- if(e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){
- logger.error("zookeeper service not started",e);
- }else{
- logger.error(e.getMessage(),e);
- }
-
- }finally {
- return childrenList.size();
+ logger.error("getActiveMasterNum error",e);
}
+ return childrenList.size();
}
/**
@@ -280,10 +269,9 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
Map<String, String> masterMap = new HashMap<>();
try {
String path = getZNodeParentPath(zkNodeType);
- List<String> serverList = getZkClient().getChildren().forPath(path);
+ List<String> serverList = super.getChildrenKeys(path);
for(String server : serverList){
- byte[] bytes = getZkClient().getData().forPath(path + "/" + server);
- masterMap.putIfAbsent(server, new String(bytes));
+ masterMap.putIfAbsent(server, super.get(path + "/" + server));
}
} catch (Exception e) {
logger.error("get server list failed : " + e.getMessage(), e);
@@ -430,9 +418,9 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
*/
protected void initSystemZNode(){
try {
- createNodePath(getMasterZNodeParentPath());
- createNodePath(getWorkerZNodeParentPath());
- createNodePath(getDeadZNodeParentPath());
+ persist(getMasterZNodeParentPath(), "");
+ persist(getWorkerZNodeParentPath(), "");
+ persist(getDeadZNodeParentPath(), "");
} catch (Exception e) {
logger.error("init system znode failed : " + e.getMessage(),e);
@@ -440,18 +428,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
}
/**
- * create zookeeper node path if not exists
- * @param zNodeParentPath zookeeper parent path
- * @throws Exception errors
- */
- private void createNodePath(String zNodeParentPath) throws Exception {
- if(null == zkClient.checkExists().forPath(zNodeParentPath)){
- zkClient.create().creatingParentContainersIfNeeded()
- .withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath);
- }
- }
-
- /**
* server self dead, stop all threads
* @param serverHost server host
* @param zkNodeType zookeeper node type