You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:04 UTC

[pulsar] 18/38: [broker] register loadbalance znode should attempt to wait until session expired (#6788)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4cb536ba25a57f83febdb7ae1cd1d2b84d02d7a8
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Apr 22 17:47:24 2020 -0700

    [broker] register loadbalance znode should attempt to wait until session expired (#6788)
    
    *Motivation*
    
    Exceptions `Broker-znode owned by different zk-session` is commonly seen when a broker
    starts up in Kubernetes environment. That's because the previous znode is not expired.
    (cherry picked from commit b119611f18afc159f35c993ef48ad0a5e6537707)
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 37 +++++-----
 .../loadbalance/impl/SimpleLoadManagerImpl.java    | 30 +++++----
 .../java/org/apache/pulsar/zookeeper/ZkUtils.java  | 78 ++++++++++++++++++++++
 3 files changed, 112 insertions(+), 33 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index c3013a0..e9c896d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -798,18 +798,24 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
             final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress;
             updateLocalBrokerData();
             try {
-                ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(),
+                if (!org.apache.pulsar.zookeeper.ZkUtils.checkNodeAndWaitExpired(
+                    zkClient, brokerZnodePath,
+                    pulsar.getConfig().getZooKeeperSessionTimeoutMillis())) {
+                    ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(),
                         ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-            } catch (KeeperException.NodeExistsException e) {
-                long ownerZkSessionId = getBrokerZnodeOwner();
-                if (ownerZkSessionId != 0 && ownerZkSessionId != zkClient.getSessionId()) {
-                    log.error("Broker znode - [{}] is own by different zookeeper-ssession {} ", brokerZnodePath,
-                            ownerZkSessionId);
-                    throw new PulsarServerException(
-                            "Broker-znode owned by different zk-session " + ownerZkSessionId);
+                } else {
+                    // Node may already be created by another load manager: in this case update the data.
+                    zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1);
                 }
-                // Node may already be created by another load manager: in this case update the data.
-                zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1);
+            } catch (KeeperException.NodeExistsException e) {
+                log.error("Broker znode - [{}] is own by different zookeeper-session", brokerZnodePath);
+                throw new PulsarServerException(
+                    "Broker znode - [" + brokerZnodePath + "] is owned by different zk-session");
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                // Catching exception here to print the right error message
+                log.error("Interrupted at creating znode - [{}] for load balance on zookeeper ", brokerZnodePath, ie);
+                throw ie;
             } catch (Exception e) {
                 // Catching exception here to print the right error message
                 log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e);
@@ -939,17 +945,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
         }
     }
 
-    private long getBrokerZnodeOwner() {
-        try {
-            Stat stat = new Stat();
-            zkClient.getData(brokerZnodePath, false, stat);
-            return stat.getEphemeralOwner();
-        } catch (Exception e) {
-            log.warn("Failed to get stat of {}", brokerZnodePath, e);
-        }
-        return 0;
-    }
-
     private void refreshBrokerToFailureDomainMap() {
         if (!pulsar.getConfiguration().isFailureDomainsEnabled()) {
             return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 765f6c6..56fe777 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -301,20 +301,26 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
                 loadReportJson = ObjectMapperFactory.getThreadLocal().writeValueAsString(loadReport);
             }
             try {
-                ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath,
+                if (!org.apache.pulsar.zookeeper.ZkUtils.checkNodeAndWaitExpired(
+                    pulsar.getZkClient(), brokerZnodePath,
+                    pulsar.getConfig().getZooKeeperSessionTimeoutMillis())) {
+                    ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath,
                         loadReportJson.getBytes(Charsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-            } catch (KeeperException.NodeExistsException e) {
-                long ownerZkSessionId = getBrokerZnodeOwner();
-                if (ownerZkSessionId != 0 && ownerZkSessionId != pulsar.getZkClient().getSessionId()) {
-                    log.error("Broker znode - [{}] is own by different zookeeper-ssession {} ", brokerZnodePath,
-                            ownerZkSessionId);
-                    throw new PulsarServerException("Broker-znode owned by different zk-session " + ownerZkSessionId);
-                }
-                // Node may already be created by another load manager: in this case update the data.
-                if (loadReport != null) {
-                    pulsar.getZkClient().setData(brokerZnodePath, loadReportJson.getBytes(Charsets.UTF_8), -1);
+                } else {
+                    // Node may already be created by another load manager: in this case update the data.
+                    if (loadReport != null) {
+                        pulsar.getZkClient().setData(brokerZnodePath, loadReportJson.getBytes(Charsets.UTF_8), -1);
+                    }
                 }
-
+            } catch (KeeperException.NodeExistsException e) {
+                log.error("Broker znode - [{}] is own by different zookeeper-session", brokerZnodePath);
+                throw new PulsarServerException(
+                    "Broker znode - [" + brokerZnodePath + "] is owned by different zk-session");
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                // Catching exception here to print the right error message
+                log.error("Interrupted at creating znode - [{}] for load balance on zookeeper ", brokerZnodePath, ie);
+                throw ie;
             } catch (Exception e) {
                 // Catching excption here to print the right error message
                 log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java
new file mode 100644
index 0000000..d9e8ad4
--- /dev/null
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper utils.
+ */
+public final class ZkUtils {
+
+    private static final Logger log = LoggerFactory.getLogger(ZkUtils.class);
+
+    /**
+     * Check if the provided <i>path</i> exists or not and wait it expired if possible.
+     *
+     * @param zk the zookeeper client instance
+     * @param path the zookeeper path
+     * @param sessionTimeoutMs session timeout in milliseconds
+     * @return true if path exists, otherwise return false
+     * @throws KeeperException when failed to access zookeeper
+     * @throws InterruptedException interrupted when waiting for znode to be expired
+     */
+    public static boolean checkNodeAndWaitExpired(ZooKeeper zk,
+                                                  String path,
+                                                  long sessionTimeoutMs) throws KeeperException, InterruptedException {
+        final CountDownLatch prevNodeLatch = new CountDownLatch(1);
+        Watcher zkPrevNodeWatcher = watchedEvent -> {
+            // check for prev node deletion.
+            if (EventType.NodeDeleted == watchedEvent.getType()) {
+                prevNodeLatch.countDown();
+            }
+        };
+        Stat stat = zk.exists(path, zkPrevNodeWatcher);
+        if (null != stat) {
+            // if the ephemeral owner isn't current zookeeper client
+            // wait for it to be expired
+            if (stat.getEphemeralOwner() != zk.getSessionId()) {
+                log.info("Previous znode : {} still exists, so waiting {} ms for znode deletion",
+                    path, sessionTimeoutMs);
+                if (!prevNodeLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS)) {
+                    throw new NodeExistsException(path);
+                } else {
+                    return false;
+                }
+            }
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+}