You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2021/06/07 08:31:06 UTC

[GitHub] [dolphinscheduler] ruanwenjun commented on a change in pull request #5562: [Feature-#3961][Registry]Registry-SPI

ruanwenjun commented on a change in pull request #5562:
URL: https://github.com/apache/dolphinscheduler/pull/5562#discussion_r646372363



##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
##########
@@ -357,9 +299,99 @@ private void failoverMaster(String masterHost) {
         logger.info("master failover end");
     }
 
-    public InterProcessMutex blockAcquireMutex() throws Exception {
-        InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath());
-        mutex.acquire();
-        return mutex;
+    public void blockAcquireMutex() {
+        registryClient.getLock(registryClient.getMasterLockPath());
     }
+
+    public void releaseLock() {
+        registryClient.releaseLock(registryClient.getMasterLockPath());
+    }
+
+    /**
+     * master config
+     */
+    @Autowired
+    private MasterConfig masterConfig;
+
+    /**
+     * heartbeat executor
+     */
+    private ScheduledExecutorService heartBeatExecutor;
+
+    /**
+     * master start time
+     */
+    private String startTime;
+
+    private String localNodePath;
+
+    @PostConstruct
+    public void init() {
+        this.startTime = DateUtils.dateToString(new Date());
+        this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
+        registryClient.init();
+    }
+
+    /**
+     * registry
+     */
+    public void registry() {
+        String address = NetUtils.getAddr(masterConfig.getListenPort());
+        localNodePath = getMasterPath();
+        registryClient.persistEphemeral(localNodePath, "");
+        registryClient.addConnectionStateListener(new MasterRegistryConnectStateListener());
+        int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
+        HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
+                masterConfig.getMasterMaxCpuloadAvg(),
+                masterConfig.getMasterReservedMemory(),
+                Sets.newHashSet(getMasterPath()),
+                Constants.MASTER_TYPE,
+                registryClient);
+
+        this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
+        logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
+
+    }
+
+    class MasterRegistryConnectStateListener implements RegistryConnectListener {
+
+        @Override
+        public void notify(RegistryConnectState newState) {
+            if (RegistryConnectState.RECONNECTED == newState) {
+                registryClient.persistEphemeral(localNodePath, "");

Review comment:
       When reconnect to zookeeper, if the session has not expired, the previous ephemeral node may still exist.
   If we delete the previous node, will cause the current instance to be killed.
   ```
   public void persistEphemeral(String key, String value) {
           try {
               if (isExisted(key)) {
                   client.delete().deletingChildrenIfNeeded().forPath(key);
               }
               client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
           } catch (Exception e) {
               throw new RegistryException("zookeeper persist ephemeral error", e);
           }
       }
   ```

##########
File path: dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
##########
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.registry;
+
+import static org.apache.dolphinscheduler.common.Constants.ADD_OP;
+import static org.apache.dolphinscheduler.common.Constants.COLON;
+import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
+import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.utils.ResInfo;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+/**
+ * abstract registry client
+ */
+@Service
+public class RegistryClient extends RegistryCenter {
+
+    private static final Logger logger = LoggerFactory.getLogger(RegistryClient.class);
+
+    private void loadRegistry() {
+        init();
+    }
+
+    /**
+     * get active master num
+     *
+     * @return active master number
+     */
+    public int getActiveMasterNum() {
+        List<String> childrenList = new ArrayList<>();
+        try {
+            // read master node parent path from conf
+            if (isExisted(getNodeParentPath(NodeType.MASTER))) {
+                childrenList = getChildrenKeys(getNodeParentPath(NodeType.MASTER));
+            }
+        } catch (Exception e) {
+            logger.error("getActiveMasterNum error", e);
+        }
+        return childrenList.size();
+    }
+
+    /**
+     * get server list.
+     *
+     * @param nodeType zookeeper node type
+     * @return server list
+     */
+    public List<Server> getServerList(NodeType nodeType) {
+        Map<String, String> serverMaps = getServerMaps(nodeType);
+        String parentPath = getNodeParentPath(nodeType);
+
+        List<Server> serverList = new ArrayList<>();
+        for (Map.Entry<String, String> entry : serverMaps.entrySet()) {
+            Server server = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
+            if (server == null) {
+                continue;
+            }
+            String key = entry.getKey();
+            server.setZkDirectory(parentPath + "/" + key);
+            // set host and port
+            String[] hostAndPort = key.split(COLON);
+            String[] hosts = hostAndPort[0].split(DIVISION_STRING);
+            // fetch the last one
+            server.setHost(hosts[hosts.length - 1]);
+            server.setPort(Integer.parseInt(hostAndPort[1]));
+            serverList.add(server);
+        }
+        return serverList;
+    }
+
+    /**
+     * get server nodes.
+     *
+     * @param nodeType registry node type
+     * @return result : list<node>
+     */
+    public List<String> getServerNodes(NodeType nodeType) {
+        String path = getNodeParentPath(nodeType);
+        List<String> serverList = getChildrenKeys(path);
+        if (nodeType == NodeType.WORKER) {
+            List<String> workerList = new ArrayList<>();
+            for (String group : serverList) {
+                List<String> groupServers = getChildrenKeys(path + Constants.SLASH + group);
+                for (String groupServer : groupServers) {
+                    workerList.add(group + Constants.SLASH + groupServer);
+                }
+            }
+            serverList = workerList;
+        }
+        return serverList;
+    }
+
+    /**
+     * get server list map.
+     *
+     * @param nodeType zookeeper node type
+     * @param hostOnly host only
+     * @return result : {host : resource info}
+     */
+    public Map<String, String> getServerMaps(NodeType nodeType, boolean hostOnly) {
+        Map<String, String> serverMap = new HashMap<>();
+        try {
+            String path = getNodeParentPath(nodeType);
+            List<String> serverList = getServerNodes(nodeType);
+            for (String server : serverList) {
+                String host = server;
+                if (nodeType == NodeType.WORKER && hostOnly) {
+                    host = server.split(Constants.SLASH)[1];
+                }
+                serverMap.putIfAbsent(host, get(path + Constants.SLASH + server));
+            }
+        } catch (Exception e) {
+            logger.error("get server list failed", e);
+        }
+
+        return serverMap;
+    }
+
+    /**
+     * get server list map.
+     *
+     * @param nodeType zookeeper node type
+     * @return result : {host : resource info}
+     */
+    public Map<String, String> getServerMaps(NodeType nodeType) {
+        return getServerMaps(nodeType, false);
+    }
+
+    /**
+     * get server node set.
+     *
+     * @param nodeType zookeeper node type
+     * @param hostOnly host only
+     * @return result : set<host>
+     */
+    public Set<String> getServerNodeSet(NodeType nodeType, boolean hostOnly) {
+        Set<String> serverSet = new HashSet<>();
+        try {
+            List<String> serverList = getServerNodes(nodeType);
+            for (String server : serverList) {
+                String host = server;
+                if (nodeType == NodeType.WORKER && hostOnly) {
+                    host = server.split(Constants.SLASH)[1];
+                }
+                serverSet.add(host);
+            }
+        } catch (Exception e) {
+            logger.error("get server node set failed", e);
+        }
+        return serverSet;
+    }
+
+    /**
+     * get server node list.
+     *
+     * @param nodeType zookeeper node type
+     * @param hostOnly host only
+     * @return result : list<host>
+     */
+    public List<String> getServerNodeList(NodeType nodeType, boolean hostOnly) {
+        Set<String> serverSet = getServerNodeSet(nodeType, hostOnly);
+        List<String> serverList = new ArrayList<>(serverSet);
+        Collections.sort(serverList);
+        return serverList;
+    }
+
+    /**
+     * check the zookeeper node already exists
+     *
+     * @param host host
+     * @param nodeType zookeeper node type
+     * @return true if exists
+     */
+    public boolean checkNodeExists(String host, NodeType nodeType) {
+        String path = getNodeParentPath(nodeType);
+        if (StringUtils.isEmpty(path)) {
+            logger.error("check zk node exists error, host:{}, zk node type:{}",
+                    host, nodeType);
+            return false;
+        }
+        Map<String, String> serverMaps = getServerMaps(nodeType, true);
+        for (String hostKey : serverMaps.keySet()) {
+            if (hostKey.contains(host)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @return get worker node parent path
+     */
+    protected String getWorkerNodeParentPath() {
+        return Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+    }
+
+    /**
+     * @return get master node parent path
+     */
+    protected String getMasterNodeParentPath() {
+        return Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+    }
+
+    /**
+     * @return get dead server node parent path
+     */
+    protected String getDeadNodeParentPath() {
+        return Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
+    }
+
+    /**
+     * @return get master lock path
+     */
+    public String getMasterLockPath() {
+        return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
+    }
+
+    /**
+     * @param nodeType zookeeper node type
+     * @return get zookeeper node parent path
+     */
+    public String getNodeParentPath(NodeType nodeType) {
+        String path = "";
+        switch (nodeType) {
+            case MASTER:
+                return getMasterNodeParentPath();
+            case WORKER:
+                return getWorkerNodeParentPath();
+            case DEAD_SERVER:
+                return getDeadNodeParentPath();
+            default:
+                break;
+        }
+        return path;
+    }
+
+    /**
+     * @return get master start up lock path
+     */
+    public String getMasterStartUpLockPath() {
+        return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
+    }
+
+    /**
+     * @return get master failover lock path
+     */
+    public String getMasterFailoverLockPath() {
+        return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
+    }
+
+    /**
+     * @return get worker failover lock path
+     */
+    public String getWorkerFailoverLockPath() {
+        return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
+    }
+
+    /**
+     * init system node
+     */
+    public void initSystemNode() {

Review comment:
       The `initSystemNode`  method does the same thing with the parent method `RegistryCenter.initNodes()`.
   We should better kept one.
   
   And when execute persist method, it will delete all sub nodes. 
   We should check whether the system node has been initialized.
    
   ```java
   public void persist(String key, String value) {
           try {
               if (isExisted(key)) {
                   client.delete().deletingChildrenIfNeeded().forPath(key);
               }
               client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
   
           } catch (Exception e) {
               throw new RegistryException("zookeeper persist error", e);
           }
       }
   ```

##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
##########
@@ -15,171 +15,152 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.master.zk;
+package org.apache.dolphinscheduler.server.master.registry;
 
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
 import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.ZKNodeType;
+import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
-import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
+import org.apache.dolphinscheduler.spi.register.RegistryConnectState;
 
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import com.google.common.collect.Sets;
+
 /**
  * zookeeper master client
  * <p>
  * single instance
  */
 @Component
-public class ZKMasterClient extends AbstractZKClient {
+public class MasterRegistryClient {
 
     /**
      * logger
      */
-    private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class);
+    private static final Logger logger = LoggerFactory.getLogger(MasterRegistryClient.class);
 
     /**
      * process service
      */
     @Autowired
     private ProcessService processService;
-
-    /**
-     * master registry
-     */
     @Autowired
-    private MasterRegistry masterRegistry;
+    private RegistryClient registryClient;
 
     public void start() {
-        InterProcessMutex mutex = null;
+        init();
+        String znodeLock = registryClient.getMasterStartUpLockPath();
         try {
             // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/startup-masters
-            String znodeLock = getMasterStartUpLockPath();
-            mutex = new InterProcessMutex(getZkClient(), znodeLock);
-            mutex.acquire();
 
+            registryClient.getLock(znodeLock);
             // master registry
-            masterRegistry.registry();
-            String registryPath = this.masterRegistry.getMasterPath();
-            masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registryPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP);
+            registry();
+            String registryPath = getMasterPath();
+            registryClient.handleDeadServer(registryPath, NodeType.MASTER, Constants.DELETE_OP);
 
-            // init system znode
-            this.initSystemZNode();
+            // init system node
+            registryClient.initSystemNode();

Review comment:
       The `initSystemNode()` should execute before registry() method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org