You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/12 13:45:04 UTC

[pulsar] 13/17: [ISSUE 6563][Broker] Invalidate managed ledgers zookeeper cache instead of reloading on watcher triggered (#6659)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 926d6871436ba463d64440933a3b8a4aa85d15bf
Author: Pavel <rf...@gmail.com>
AuthorDate: Tue May 12 10:14:10 2020 +0300

    [ISSUE 6563][Broker] Invalidate managed ledgers zookeeper cache instead of reloading on watcher triggered (#6659)
    
    Fixes #6563
    
    ### Motivation
    Frequent topics creation/deletion triggers zookeeper children cache reloading for z-nodes **/managed-ledgers/<tenant_name>/<cluster_name>/<namespace_name>/persistent** more than needed.
    This creates additional load on zookeeper and broker, slows down brokers and makes them less stable. Also this causes scalability issues - adding more brokers increases operations duration even more.
    
    * [ISSUE 6563][Broker] Invalidate managed ledgers zookeeper cache instead of reloading on watcher triggered
    
    * [ISSUE 6563][Broker] Adding licence header to the new class
    
    * [ISSUE 6563][Broker] Invalidate correct zk cache path
    
    * [ISSUE 6563][Broker] Fix mocking issue
    
    Co-authored-by: Pavel Tishkevich <pa...@onde.app>
    Co-authored-by: penghui <pe...@apache.org>(cherry picked from commit f4fc7994071383748d1c5def92da2a652419c2c0)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  3 +-
 .../broker/cache/LocalZooKeeperCacheService.java   |  8 +--
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  6 +-
 .../pulsar/zookeeper/ZooKeeperChildrenCache.java   |  4 --
 .../zookeeper/ZooKeeperManagedLedgerCache.java     | 71 ++++++++++++++++++++++
 5 files changed, 80 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 42e7a7b..c49acbe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -72,6 +72,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
+import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
@@ -526,7 +527,7 @@ public abstract class AdminResource extends PulsarWebResource {
         return pulsar().getConfigurationCache().clustersCache();
     }
 
-    protected ZooKeeperChildrenCache managedLedgerListCache() {
+    protected ZooKeeperManagedLedgerCache managedLedgerListCache() {
         return pulsar().getLocalZkCacheService().managedLedgerListCache();
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 625f805..48b3f88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -34,7 +34,7 @@ import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
-import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
+import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -54,7 +54,7 @@ public class LocalZooKeeperCacheService {
     private final ZooKeeperCache cache;
 
     private ZooKeeperDataCache<NamespaceEphemeralData> ownerInfoCache;
-    private ZooKeeperChildrenCache managedLedgerListCache;
+    private ZooKeeperManagedLedgerCache managedLedgerListCache;
     private ResourceQuotaCache resourceQuotaCache;
     private ZooKeeperDataCache<LocalPolicies> policiesCache;
 
@@ -118,7 +118,7 @@ public class LocalZooKeeperCacheService {
             }
         };
 
-        this.managedLedgerListCache = new ZooKeeperChildrenCache(cache, MANAGED_LEDGER_ROOT);
+        this.managedLedgerListCache = new ZooKeeperManagedLedgerCache(cache, MANAGED_LEDGER_ROOT);
         this.resourceQuotaCache = new ResourceQuotaCache(cache);
         this.resourceQuotaCache.initZK();
     }
@@ -244,7 +244,7 @@ public class LocalZooKeeperCacheService {
         return this.policiesCache;
     }
 
-    public ZooKeeperChildrenCache managedLedgerListCache() {
+    public ZooKeeperManagedLedgerCache managedLedgerListCache() {
         return this.managedLedgerListCache;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index fe1ad87..bec3101 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -40,7 +40,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
+import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
 import org.apache.zookeeper.KeeperException;
 import org.mockito.ArgumentCaptor;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -254,7 +254,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final String nonPartitionTopicName2 = "special-topic-partition-123";
         final String partitionedTopicName = "special-topic";
         LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class);
-        ZooKeeperChildrenCache mockZooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class);
+        ZooKeeperManagedLedgerCache mockZooKeeperChildrenCache = mock(ZooKeeperManagedLedgerCache.class);
         doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
         doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
         doReturn(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
@@ -272,7 +272,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final String nonPartitionTopicName2 = "special-topic-partition-10";
         final String partitionedTopicName = "special-topic";
         LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class);
-        ZooKeeperChildrenCache mockZooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class);
+        ZooKeeperManagedLedgerCache mockZooKeeperChildrenCache = mock(ZooKeeperManagedLedgerCache.class);
         doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
         doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
         doReturn(ImmutableSet.of(nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
index 0bb6f46..ce64480 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
@@ -75,10 +75,6 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
         cache.invalidateChildren(path);
     }
 
-    public void clearTree() {
-        cache.invalidateRoot(path);
-    }
-
     @Override
     public void reloadCache(final String path) {
         cache.invalidate(path);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperManagedLedgerCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperManagedLedgerCache.java
new file mode 100644
index 0000000..0b2ab14
--- /dev/null
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperManagedLedgerCache.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+public class ZooKeeperManagedLedgerCache implements Watcher {
+    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperManagedLedgerCache.class);
+
+    private final ZooKeeperCache cache;
+    private final String path;
+
+    public ZooKeeperManagedLedgerCache(ZooKeeperCache cache, String path) {
+        this.cache = cache;
+        this.path = path;
+    }
+
+    public Set<String> get(String path) throws KeeperException, InterruptedException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("getChildren called at: {}", path);
+        }
+
+        Set<String> children = cache.getChildrenAsync(path, this).join();
+        if (children == null) {
+            throw KeeperException.create(KeeperException.Code.NONODE);
+        }
+
+        return children;
+    }
+
+    public CompletableFuture<Set<String>> getAsync(String path) {
+        return cache.getChildrenAsync(path, this);
+    }
+
+    public void clearTree() {
+        cache.invalidateRoot(path);
+    }
+
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+        LOG.info("[{}] Received ZooKeeper watch event: {}", cache.zkSession.get(), watchedEvent);
+        String watchedEventPath = watchedEvent.getPath();
+        if (watchedEventPath != null) {
+            LOG.info("invalidate called in zookeeperChildrenCache for path {}", watchedEventPath);
+            cache.invalidate(watchedEventPath);
+        }
+    }
+}