You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ti...@apache.org on 2023/09/15 01:03:49 UTC

[curator] branch master updated: CURATOR-690. CuratorCache fails to load the cache if there are more than 64k child znodes (#480)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new 51483adb CURATOR-690. CuratorCache fails to load the cache if there are more than 64k child znodes (#480)
51483adb is described below

commit 51483adb1d1dd3e43812b5d0ec7c03b6d1d05c7c
Author: tison <wa...@gmail.com>
AuthorDate: Fri Sep 15 09:03:44 2023 +0800

    CURATOR-690. CuratorCache fails to load the cache if there are more than 64k child znodes (#480)
    
    Signed-off-by: tison <wa...@gmail.com>
    Co-authored-by: Ryan Ruel <rr...@akamai.com>
---
 .../framework/recipes/cache/CuratorCacheImpl.java  | 22 ++++------
 .../framework/recipes/cache/OutstandingOps.java    | 49 ++++++++++++++++++++++
 .../framework/recipes/cache/TestCuratorCache.java  | 47 +++++++++++++++++++++
 .../org/apache/curator/test/BaseClassForTests.java |  2 +-
 4 files changed, 106 insertions(+), 14 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index 3fccc37b..23268f5e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -19,7 +19,9 @@
 
 package org.apache.curator.framework.recipes.cache;
 
-import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CHANGED;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CREATED;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_DELETED;
 import static org.apache.zookeeper.KeeperException.Code.NONODE;
 import static org.apache.zookeeper.KeeperException.Code.OK;
 import com.google.common.annotations.VisibleForTesting;
@@ -28,7 +30,6 @@ import com.google.common.collect.Sets;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Phaser;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Stream;
@@ -59,13 +60,8 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
     private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
     private final Consumer<Exception> exceptionHandler;
 
-    private final Phaser outstandingOps = new Phaser() {
-        @Override
-        protected boolean onAdvance(int phase, int registeredParties) {
-            callListeners(CuratorCacheListener::initialized);
-            return true;
-        }
-    };
+    private final OutstandingOps outstandingOps =
+            new OutstandingOps(() -> callListeners(CuratorCacheListener::initialized));
 
     private enum State {
         LATENT,
@@ -191,10 +187,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
                 } else {
                     handleException(event);
                 }
-                outstandingOps.arriveAndDeregister();
+                outstandingOps.decrement();
             };
 
-            outstandingOps.register();
+            outstandingOps.increment();
             client.getChildren().inBackground(callback).forPath(fromPath);
         } catch (Exception e) {
             handleException(e);
@@ -218,10 +214,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
                 } else {
                     handleException(event);
                 }
-                outstandingOps.arriveAndDeregister();
+                outstandingOps.decrement();
             };
 
-            outstandingOps.register();
+            outstandingOps.increment();
             if (compressedData) {
                 client.getData().decompressed().inBackground(callback).forPath(fromPath);
             } else {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
new file mode 100644
index 00000000..81a1ac26
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+class OutstandingOps {
+    private final AtomicReference<Runnable> completionProc;
+    private final AtomicLong count = new AtomicLong(0);
+    private volatile boolean active = true;
+
+    OutstandingOps(Runnable completionProc) {
+        this.completionProc = new AtomicReference<>(completionProc);
+    }
+
+    void increment() {
+        if (active) {
+            count.incrementAndGet();
+        }
+    }
+
+    void decrement() {
+        if (active && (count.decrementAndGet() == 0)) {
+            Runnable proc = completionProc.getAndSet(null);
+            if (proc != null) {
+                active = false;
+                proc.run();
+            }
+        }
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
index 411a51af..fe37d674 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -23,11 +23,14 @@ import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO
 import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.junit.jupiter.api.Tag;
@@ -179,4 +182,48 @@ public class TestCuratorCache extends CuratorTestBase {
             assertEquals(storage.size(), 0);
         }
     }
+
+    // CURATOR-690 - CuratorCache fails to load the cache if there are more than 64K child ZNodes
+    @Test
+    public void testGreaterThan64kZNodes() throws Exception {
+        final CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
+
+        // Phaser has a hard-limit of 64k registrants; we need to create more than that to trigger the initial problem.
+        final int zNodeCount = 0xFFFF + 5;
+
+        // Bulk creations in multiOp for (1) speed up creations (2) not exceed jute.maxbuffer size.
+        final int bulkSize = 10000;
+
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(
+                server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) {
+            client.start();
+            final CountDownLatch initializedLatch = new CountDownLatch(1);
+            client.create().creatingParentsIfNeeded().forPath("/test");
+
+            final List<CuratorOp> creations = new ArrayList<>();
+            for (int i = 0; i < zNodeCount; i++) {
+                creations.add(client.transactionOp().create().forPath("/test/node_" + i));
+                if (creations.size() > bulkSize) {
+                    client.transaction().forOperations(creations);
+                    creations.clear();
+                }
+            }
+            client.transaction().forOperations(creations);
+            creations.clear();
+
+            try (CuratorCache cache =
+                    CuratorCache.builder(client, "/test").withStorage(storage).build()) {
+                final CuratorCacheListener listener =
+                        builder().forInitialized(initializedLatch::countDown).build();
+                cache.listenable().addListener(listener);
+                cache.start();
+
+                assertTrue(timing.awaitLatch(initializedLatch));
+                assertEquals(
+                        zNodeCount + 1,
+                        cache.size(),
+                        "Cache size should be equal to the number of zNodes created plus the root");
+            }
+        }
+    }
 }
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 76a87ad8..b66a7e52 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
 public class BaseClassForTests {
     protected volatile TestingServer server;
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
+    protected final Logger log = LoggerFactory.getLogger(getClass());
     private final AtomicBoolean isRetrying = new AtomicBoolean(false);
 
     private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;