You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ti...@apache.org on 2023/09/15 01:03:49 UTC
[curator] branch master updated: CURATOR-690. CuratorCache fails to load the cache if there are more than 64k child znodes (#480)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new 51483adb CURATOR-690. CuratorCache fails to load the cache if there are more than 64k child znodes (#480)
51483adb is described below
commit 51483adb1d1dd3e43812b5d0ec7c03b6d1d05c7c
Author: tison <wa...@gmail.com>
AuthorDate: Fri Sep 15 09:03:44 2023 +0800
CURATOR-690. CuratorCache fails to load the cache if there are more than 64k child znodes (#480)
Signed-off-by: tison <wa...@gmail.com>
Co-authored-by: Ryan Ruel <rr...@akamai.com>
---
.../framework/recipes/cache/CuratorCacheImpl.java | 22 ++++------
.../framework/recipes/cache/OutstandingOps.java | 49 ++++++++++++++++++++++
.../framework/recipes/cache/TestCuratorCache.java | 47 +++++++++++++++++++++
.../org/apache/curator/test/BaseClassForTests.java | 2 +-
4 files changed, 106 insertions(+), 14 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index 3fccc37b..23268f5e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -19,7 +19,9 @@
package org.apache.curator.framework.recipes.cache;
-import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CHANGED;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CREATED;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_DELETED;
import static org.apache.zookeeper.KeeperException.Code.NONODE;
import static org.apache.zookeeper.KeeperException.Code.OK;
import com.google.common.annotations.VisibleForTesting;
@@ -28,7 +30,6 @@ import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;
@@ -59,13 +60,8 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
private final Consumer<Exception> exceptionHandler;
- private final Phaser outstandingOps = new Phaser() {
- @Override
- protected boolean onAdvance(int phase, int registeredParties) {
- callListeners(CuratorCacheListener::initialized);
- return true;
- }
- };
+ private final OutstandingOps outstandingOps =
+ new OutstandingOps(() -> callListeners(CuratorCacheListener::initialized));
private enum State {
LATENT,
@@ -191,10 +187,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
} else {
handleException(event);
}
- outstandingOps.arriveAndDeregister();
+ outstandingOps.decrement();
};
- outstandingOps.register();
+ outstandingOps.increment();
client.getChildren().inBackground(callback).forPath(fromPath);
} catch (Exception e) {
handleException(e);
@@ -218,10 +214,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
} else {
handleException(event);
}
- outstandingOps.arriveAndDeregister();
+ outstandingOps.decrement();
};
- outstandingOps.register();
+ outstandingOps.increment();
if (compressedData) {
client.getData().decompressed().inBackground(callback).forPath(fromPath);
} else {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
new file mode 100644
index 00000000..81a1ac26
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+class OutstandingOps {
+ private final AtomicReference<Runnable> completionProc;
+ private final AtomicLong count = new AtomicLong(0);
+ private volatile boolean active = true;
+
+ OutstandingOps(Runnable completionProc) {
+ this.completionProc = new AtomicReference<>(completionProc);
+ }
+
+ void increment() {
+ if (active) {
+ count.incrementAndGet();
+ }
+ }
+
+ void decrement() {
+ if (active && (count.decrementAndGet() == 0)) {
+ Runnable proc = completionProc.getAndSet(null);
+ if (proc != null) {
+ active = false;
+ proc.run();
+ }
+ }
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
index 411a51af..fe37d674 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -23,11 +23,14 @@ import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO
import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.junit.jupiter.api.Tag;
@@ -179,4 +182,48 @@ public class TestCuratorCache extends CuratorTestBase {
assertEquals(storage.size(), 0);
}
}
+
+ // CURATOR-690 - CuratorCache fails to load the cache if there are more than 64K child ZNodes
+ @Test
+ public void testGreaterThan64kZNodes() throws Exception {
+ final CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
+
+ // Phaser has a hard-limit of 64k registrants; we need to create more than that to trigger the initial problem.
+ final int zNodeCount = 0xFFFF + 5;
+
+ // Bulk creations in multiOp for (1) speed up creations (2) not exceed jute.maxbuffer size.
+ final int bulkSize = 10000;
+
+ try (CuratorFramework client = CuratorFrameworkFactory.newClient(
+ server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) {
+ client.start();
+ final CountDownLatch initializedLatch = new CountDownLatch(1);
+ client.create().creatingParentsIfNeeded().forPath("/test");
+
+ final List<CuratorOp> creations = new ArrayList<>();
+ for (int i = 0; i < zNodeCount; i++) {
+ creations.add(client.transactionOp().create().forPath("/test/node_" + i));
+ if (creations.size() > bulkSize) {
+ client.transaction().forOperations(creations);
+ creations.clear();
+ }
+ }
+ client.transaction().forOperations(creations);
+ creations.clear();
+
+ try (CuratorCache cache =
+ CuratorCache.builder(client, "/test").withStorage(storage).build()) {
+ final CuratorCacheListener listener =
+ builder().forInitialized(initializedLatch::countDown).build();
+ cache.listenable().addListener(listener);
+ cache.start();
+
+ assertTrue(timing.awaitLatch(initializedLatch));
+ assertEquals(
+ zNodeCount + 1,
+ cache.size(),
+ "Cache size should be equal to the number of zNodes created plus the root");
+ }
+ }
+ }
}
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 76a87ad8..b66a7e52 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
public class BaseClassForTests {
protected volatile TestingServer server;
- private final Logger log = LoggerFactory.getLogger(getClass());
+ protected final Logger log = LoggerFactory.getLogger(getClass());
private final AtomicBoolean isRetrying = new AtomicBoolean(false);
private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;