You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/12 13:45:04 UTC
[pulsar] 13/17: [ISSUE 6563][Broker] Invalidate managed ledgers
zookeeper cache instead of reloading on watcher triggered (#6659)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 926d6871436ba463d64440933a3b8a4aa85d15bf
Author: Pavel <rf...@gmail.com>
AuthorDate: Tue May 12 10:14:10 2020 +0300
[ISSUE 6563][Broker] Invalidate managed ledgers zookeeper cache instead of reloading on watcher triggered (#6659)
Fixes #6563
### Motivation
Frequent topics creation/deletion triggers zookeeper children cache reloading for z-nodes **/managed-ledgers/<tenant_name>/<cluster_name>/<namespace_name>/persistent** more than needed.
This creates additional load on zookeeper and broker, slows down brokers and makes them less stable. Also this causes scalability issues - adding more brokers increases operations duration even more.
* [ISSUE 6563][Broker] Invalidate managed ledgers zookeeper cache instead of reloading on watcher triggered
* [ISSUE 6563][Broker] Adding licence header to the new class
* [ISSUE 6563][Broker] Invalidate correct zk cache path
* [ISSUE 6563][Broker] Fix mocking issue
Co-authored-by: Pavel Tishkevich <pa...@onde.app>
Co-authored-by: penghui <pe...@apache.org>(cherry picked from commit f4fc7994071383748d1c5def92da2a652419c2c0)
---
.../apache/pulsar/broker/admin/AdminResource.java | 3 +-
.../broker/cache/LocalZooKeeperCacheService.java | 8 +--
.../pulsar/broker/admin/PersistentTopicsTest.java | 6 +-
.../pulsar/zookeeper/ZooKeeperChildrenCache.java | 4 --
.../zookeeper/ZooKeeperManagedLedgerCache.java | 71 ++++++++++++++++++++++
5 files changed, 80 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 42e7a7b..c49acbe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -72,6 +72,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
+import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
@@ -526,7 +527,7 @@ public abstract class AdminResource extends PulsarWebResource {
return pulsar().getConfigurationCache().clustersCache();
}
- protected ZooKeeperChildrenCache managedLedgerListCache() {
+ protected ZooKeeperManagedLedgerCache managedLedgerListCache() {
return pulsar().getLocalZkCacheService().managedLedgerListCache();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 625f805..48b3f88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -34,7 +34,7 @@ import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
-import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
+import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -54,7 +54,7 @@ public class LocalZooKeeperCacheService {
private final ZooKeeperCache cache;
private ZooKeeperDataCache<NamespaceEphemeralData> ownerInfoCache;
- private ZooKeeperChildrenCache managedLedgerListCache;
+ private ZooKeeperManagedLedgerCache managedLedgerListCache;
private ResourceQuotaCache resourceQuotaCache;
private ZooKeeperDataCache<LocalPolicies> policiesCache;
@@ -118,7 +118,7 @@ public class LocalZooKeeperCacheService {
}
};
- this.managedLedgerListCache = new ZooKeeperChildrenCache(cache, MANAGED_LEDGER_ROOT);
+ this.managedLedgerListCache = new ZooKeeperManagedLedgerCache(cache, MANAGED_LEDGER_ROOT);
this.resourceQuotaCache = new ResourceQuotaCache(cache);
this.resourceQuotaCache.initZK();
}
@@ -244,7 +244,7 @@ public class LocalZooKeeperCacheService {
return this.policiesCache;
}
- public ZooKeeperChildrenCache managedLedgerListCache() {
+ public ZooKeeperManagedLedgerCache managedLedgerListCache() {
return this.managedLedgerListCache;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index fe1ad87..bec3101 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -40,7 +40,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
+import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.zookeeper.KeeperException;
import org.mockito.ArgumentCaptor;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -254,7 +254,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
final String nonPartitionTopicName2 = "special-topic-partition-123";
final String partitionedTopicName = "special-topic";
LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class);
- ZooKeeperChildrenCache mockZooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class);
+ ZooKeeperManagedLedgerCache mockZooKeeperChildrenCache = mock(ZooKeeperManagedLedgerCache.class);
doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
doReturn(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
@@ -272,7 +272,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
final String nonPartitionTopicName2 = "special-topic-partition-10";
final String partitionedTopicName = "special-topic";
LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class);
- ZooKeeperChildrenCache mockZooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class);
+ ZooKeeperManagedLedgerCache mockZooKeeperChildrenCache = mock(ZooKeeperManagedLedgerCache.class);
doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
doReturn(ImmutableSet.of(nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
index 0bb6f46..ce64480 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
@@ -75,10 +75,6 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
cache.invalidateChildren(path);
}
- public void clearTree() {
- cache.invalidateRoot(path);
- }
-
@Override
public void reloadCache(final String path) {
cache.invalidate(path);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperManagedLedgerCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperManagedLedgerCache.java
new file mode 100644
index 0000000..0b2ab14
--- /dev/null
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperManagedLedgerCache.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+public class ZooKeeperManagedLedgerCache implements Watcher {
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperManagedLedgerCache.class);
+
+ private final ZooKeeperCache cache;
+ private final String path;
+
+ public ZooKeeperManagedLedgerCache(ZooKeeperCache cache, String path) {
+ this.cache = cache;
+ this.path = path;
+ }
+
+ public Set<String> get(String path) throws KeeperException, InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getChildren called at: {}", path);
+ }
+
+ Set<String> children = cache.getChildrenAsync(path, this).join();
+ if (children == null) {
+ throw KeeperException.create(KeeperException.Code.NONODE);
+ }
+
+ return children;
+ }
+
+ public CompletableFuture<Set<String>> getAsync(String path) {
+ return cache.getChildrenAsync(path, this);
+ }
+
+ public void clearTree() {
+ cache.invalidateRoot(path);
+ }
+
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ LOG.info("[{}] Received ZooKeeper watch event: {}", cache.zkSession.get(), watchedEvent);
+ String watchedEventPath = watchedEvent.getPath();
+ if (watchedEventPath != null) {
+ LOG.info("invalidate called in zookeeperChildrenCache for path {}", watchedEventPath);
+ cache.invalidate(watchedEventPath);
+ }
+ }
+}