You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:04 UTC
[pulsar] 18/38: [broker] register loadbalance znode should attempt
to wait until session expired (#6788)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4cb536ba25a57f83febdb7ae1cd1d2b84d02d7a8
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Apr 22 17:47:24 2020 -0700
[broker] register loadbalance znode should attempt to wait until session expired (#6788)
*Motivation*
Exceptions `Broker-znode owned by different zk-session` is commonly seen when a broker
starts up in Kubernetes environment. That's because the previous znode is not expired.
(cherry picked from commit b119611f18afc159f35c993ef48ad0a5e6537707)
---
.../loadbalance/impl/ModularLoadManagerImpl.java | 37 +++++-----
.../loadbalance/impl/SimpleLoadManagerImpl.java | 30 +++++----
.../java/org/apache/pulsar/zookeeper/ZkUtils.java | 78 ++++++++++++++++++++++
3 files changed, 112 insertions(+), 33 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index c3013a0..e9c896d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -798,18 +798,24 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress;
updateLocalBrokerData();
try {
- ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(),
+ if (!org.apache.pulsar.zookeeper.ZkUtils.checkNodeAndWaitExpired(
+ zkClient, brokerZnodePath,
+ pulsar.getConfig().getZooKeeperSessionTimeoutMillis())) {
+ ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- } catch (KeeperException.NodeExistsException e) {
- long ownerZkSessionId = getBrokerZnodeOwner();
- if (ownerZkSessionId != 0 && ownerZkSessionId != zkClient.getSessionId()) {
- log.error("Broker znode - [{}] is own by different zookeeper-ssession {} ", brokerZnodePath,
- ownerZkSessionId);
- throw new PulsarServerException(
- "Broker-znode owned by different zk-session " + ownerZkSessionId);
+ } else {
+ // Node may already be created by another load manager: in this case update the data.
+ zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1);
}
- // Node may already be created by another load manager: in this case update the data.
- zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1);
+ } catch (KeeperException.NodeExistsException e) {
+ log.error("Broker znode - [{}] is own by different zookeeper-session", brokerZnodePath);
+ throw new PulsarServerException(
+ "Broker znode - [" + brokerZnodePath + "] is owned by different zk-session");
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ // Catching exception here to print the right error message
+ log.error("Interrupted at creating znode - [{}] for load balance on zookeeper ", brokerZnodePath, ie);
+ throw ie;
} catch (Exception e) {
// Catching exception here to print the right error message
log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e);
@@ -939,17 +945,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
}
- private long getBrokerZnodeOwner() {
- try {
- Stat stat = new Stat();
- zkClient.getData(brokerZnodePath, false, stat);
- return stat.getEphemeralOwner();
- } catch (Exception e) {
- log.warn("Failed to get stat of {}", brokerZnodePath, e);
- }
- return 0;
- }
-
private void refreshBrokerToFailureDomainMap() {
if (!pulsar.getConfiguration().isFailureDomainsEnabled()) {
return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 765f6c6..56fe777 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -301,20 +301,26 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
loadReportJson = ObjectMapperFactory.getThreadLocal().writeValueAsString(loadReport);
}
try {
- ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath,
+ if (!org.apache.pulsar.zookeeper.ZkUtils.checkNodeAndWaitExpired(
+ pulsar.getZkClient(), brokerZnodePath,
+ pulsar.getConfig().getZooKeeperSessionTimeoutMillis())) {
+ ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath,
loadReportJson.getBytes(Charsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- } catch (KeeperException.NodeExistsException e) {
- long ownerZkSessionId = getBrokerZnodeOwner();
- if (ownerZkSessionId != 0 && ownerZkSessionId != pulsar.getZkClient().getSessionId()) {
- log.error("Broker znode - [{}] is own by different zookeeper-ssession {} ", brokerZnodePath,
- ownerZkSessionId);
- throw new PulsarServerException("Broker-znode owned by different zk-session " + ownerZkSessionId);
- }
- // Node may already be created by another load manager: in this case update the data.
- if (loadReport != null) {
- pulsar.getZkClient().setData(brokerZnodePath, loadReportJson.getBytes(Charsets.UTF_8), -1);
+ } else {
+ // Node may already be created by another load manager: in this case update the data.
+ if (loadReport != null) {
+ pulsar.getZkClient().setData(brokerZnodePath, loadReportJson.getBytes(Charsets.UTF_8), -1);
+ }
}
-
+ } catch (KeeperException.NodeExistsException e) {
+ log.error("Broker znode - [{}] is own by different zookeeper-session", brokerZnodePath);
+ throw new PulsarServerException(
+ "Broker znode - [" + brokerZnodePath + "] is owned by different zk-session");
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ // Catching exception here to print the right error message
+ log.error("Interrupted at creating znode - [{}] for load balance on zookeeper ", brokerZnodePath, ie);
+ throw ie;
} catch (Exception e) {
// Catching excption here to print the right error message
log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java
new file mode 100644
index 0000000..d9e8ad4
--- /dev/null
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper utils.
+ */
+public final class ZkUtils {
+
+ private static final Logger log = LoggerFactory.getLogger(ZkUtils.class);
+
+ /**
+ * Check if the provided <i>path</i> exists or not and wait it expired if possible.
+ *
+ * @param zk the zookeeper client instance
+ * @param path the zookeeper path
+ * @param sessionTimeoutMs session timeout in milliseconds
+ * @return true if path exists, otherwise return false
+ * @throws KeeperException when failed to access zookeeper
+ * @throws InterruptedException interrupted when waiting for znode to be expired
+ */
+ public static boolean checkNodeAndWaitExpired(ZooKeeper zk,
+ String path,
+ long sessionTimeoutMs) throws KeeperException, InterruptedException {
+ final CountDownLatch prevNodeLatch = new CountDownLatch(1);
+ Watcher zkPrevNodeWatcher = watchedEvent -> {
+ // check for prev node deletion.
+ if (EventType.NodeDeleted == watchedEvent.getType()) {
+ prevNodeLatch.countDown();
+ }
+ };
+ Stat stat = zk.exists(path, zkPrevNodeWatcher);
+ if (null != stat) {
+ // if the ephemeral owner isn't current zookeeper client
+ // wait for it to be expired
+ if (stat.getEphemeralOwner() != zk.getSessionId()) {
+ log.info("Previous znode : {} still exists, so waiting {} ms for znode deletion",
+ path, sessionTimeoutMs);
+ if (!prevNodeLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS)) {
+ throw new NodeExistsException(path);
+ } else {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+}