You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2019/12/30 09:59:15 UTC

[incubator-dolphinscheduler] branch dev updated: refactor AbstractZKClient (#1627)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5e935d5  refactor AbstractZKClient (#1627)
5e935d5 is described below

commit 5e935d579eec25701a6ce1d29fea95b161173ba3
Author: Tboy <gu...@immomo.com>
AuthorDate: Mon Dec 30 17:59:06 2019 +0800

    refactor AbstractZKClient (#1627)
    
    * we should insert alert DB once , and trigger this type of alert 3 times
    
    * refactor AbstractZKClient
---
 .../common/zk/AbstractZKClient.java                | 96 ++++++++--------------
 1 file changed, 36 insertions(+), 60 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
index 0e95ddd..c3ba718 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
@@ -16,6 +16,22 @@
  */
 package org.apache.dolphinscheduler.common.zk;
 
+import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.ZKNodeType;
@@ -23,26 +39,9 @@ import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.ResInfo;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-
 
 /**
  * abstract zookeeper client
@@ -70,8 +69,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
 				return;
 			}
 
-			byte[] bytes = zkClient.getData().forPath(znode);
-			String resInfoStr = new String(bytes);
+			String resInfoStr = super.get(znode);
 			String[] splits = resInfoStr.split(Constants.COMMA);
 			if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
 				return;
@@ -107,8 +105,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
 		String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX;
 		String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
 
-		if(zkClient.checkExists().forPath(zNode) == null ||
-				zkClient.checkExists().forPath(deadServerPath) != null ){
+		if(!isExisted(zNode) || isExisted(deadServerPath)){
 			return true;
 		}
 
@@ -118,14 +115,12 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
 
 
 	public void removeDeadServerByHost(String host, String serverType) throws Exception {
-        List<String> deadServers = zkClient.getChildren().forPath(getDeadZNodeParentPath());
+        List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
         for(String serverPath : deadServers){
             if(serverPath.startsWith(serverType+UNDERLINE+host)){
-				String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
-				if(zkClient.checkExists().forPath(server) != null){
-					zkClient.delete().forPath(server);
-					logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
-				}
+							String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
+              super.remove(server);
+							logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
             }
         }
 	}
@@ -143,8 +138,8 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
 		// create temporary sequence nodes for master znode
 		String parentPath = getZNodeParentPath(zkNodeType);
 		String serverPathPrefix = parentPath + "/" + OSUtils.getHost();
-		String registerPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
-				serverPathPrefix + UNDERLINE, heartbeatZKInfo.getBytes());
+    String registerPath = serverPathPrefix + UNDERLINE;
+    super.persistEphemeral(registerPath, heartbeatZKInfo);
 		logger.info("register {} node {} success" , zkNodeType.toString(), registerPath);
 		return registerPath;
 	}
@@ -165,7 +160,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
 		}
 		registerPath = createZNodePath(zkNodeType);
 
-        // handle dead server
+    // handle dead server
 		handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);
 
 		return registerPath;
@@ -196,10 +191,10 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
 
 		}else if(opType.equals(ADD_ZK_OP)){
 			String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
-			if(zkClient.checkExists().forPath(deadServerPath) == null){
+			if(!super.isExisted(deadServerPath)){
 				//add dead server info to zk dead server path : /dead-servers/
 
-				zkClient.create().forPath(deadServerPath,(type + UNDERLINE + ipSeqNo).getBytes());
+				super.persist(deadServerPath,(type + UNDERLINE + ipSeqNo));
 
 				logger.info("{} server dead , and {} added to zk dead server path success" ,
 						zkNodeType.toString(), zNode);
@@ -226,19 +221,13 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
 		List<String> childrenList = new ArrayList<>();
 		try {
 			// read master node parent path from conf
-			if(zkClient.checkExists().forPath(getZNodeParentPath(ZKNodeType.MASTER)) != null){
-				childrenList = zkClient.getChildren().forPath(getZNodeParentPath(ZKNodeType.MASTER));
+			if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){
+				childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
 			}
 		} catch (Exception e) {
-			if(e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){
-				logger.error("zookeeper service not started",e);
-			}else{
-				logger.error(e.getMessage(),e);
-			}
-
-		}finally {
-			return childrenList.size();
+			logger.error("getActiveMasterNum error",e);
 		}
+		return childrenList.size();
 	}
 
 	/**
@@ -280,10 +269,9 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
 		Map<String, String> masterMap = new HashMap<>();
 		try {
 			String path =  getZNodeParentPath(zkNodeType);
-			List<String> serverList  = getZkClient().getChildren().forPath(path);
+			List<String> serverList  = super.getChildrenKeys(path);
 			for(String server : serverList){
-				byte[] bytes  = getZkClient().getData().forPath(path + "/" + server);
-				masterMap.putIfAbsent(server, new String(bytes));
+				masterMap.putIfAbsent(server, super.get(path + "/" + server));
 			}
 		} catch (Exception e) {
 			logger.error("get server list failed : " + e.getMessage(), e);
@@ -430,9 +418,9 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
 	 */
 	protected void initSystemZNode(){
 		try {
-			createNodePath(getMasterZNodeParentPath());
-			createNodePath(getWorkerZNodeParentPath());
-			createNodePath(getDeadZNodeParentPath());
+			persist(getMasterZNodeParentPath(), "");
+			persist(getWorkerZNodeParentPath(), "");
+			persist(getDeadZNodeParentPath(), "");
 
 		} catch (Exception e) {
 			logger.error("init system znode failed : " + e.getMessage(),e);
@@ -440,18 +428,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
 	}
 
 	/**
-	 * create zookeeper node path if not exists
-	 * @param zNodeParentPath zookeeper parent path
-	 * @throws Exception errors
-	 */
-	private void createNodePath(String zNodeParentPath) throws Exception {
-	    if(null == zkClient.checkExists().forPath(zNodeParentPath)){
-	        zkClient.create().creatingParentContainersIfNeeded()
-					.withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath);
-		}
-	}
-
-	/**
 	 * server self dead, stop all threads
 	 * @param serverHost server host
 	 * @param zkNodeType zookeeper node type