You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ti...@apache.org on 2023/09/14 18:07:08 UTC

[curator] branch CURATOR-690.-CuratorCache-fails-to-load-the-cache-if-there-are-more-than-64K-child-ZNodes created (now 18543788)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a change to branch CURATOR-690.-CuratorCache-fails-to-load-the-cache-if-there-are-more-than-64K-child-ZNodes
in repository https://gitbox.apache.org/repos/asf/curator.git


      at 18543788 revert to 1038aa6d03f240fb8a8de9736adc51ccbcc9f5b6 solution

This branch includes the following new commits:

     new 929a89f6 speed up tests
     new 18543788 revert to 1038aa6d03f240fb8a8de9736adc51ccbcc9f5b6 solution

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[curator] 01/02: speed up tests

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch CURATOR-690.-CuratorCache-fails-to-load-the-cache-if-there-are-more-than-64K-child-ZNodes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 929a89f6f09329c8597f0483c4b035ebc6525d48
Author: tison <wa...@gmail.com>
AuthorDate: Fri Sep 15 01:48:48 2023 +0800

    speed up tests
    
    Signed-off-by: tison <wa...@gmail.com>
---
 .../framework/recipes/cache/CuratorCacheImpl.java  |  8 ++++--
 .../framework/recipes/cache/TestCuratorCache.java  | 33 ++++++++++++++++------
 .../org/apache/curator/test/BaseClassForTests.java |  2 +-
 3 files changed, 30 insertions(+), 13 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index 99913f4d..50760dd2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -19,7 +19,9 @@
 
 package org.apache.curator.framework.recipes.cache;
 
-import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.*;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CHANGED;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_CREATED;
+import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.Type.NODE_DELETED;
 import static org.apache.zookeeper.KeeperException.Code.NONODE;
 import static org.apache.zookeeper.KeeperException.Code.OK;
 import com.google.common.annotations.VisibleForTesting;
@@ -64,8 +66,8 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
         protected boolean onAdvance(int phase, int registeredParties) {
             callListeners(CuratorCacheListener::initialized);
             synchronized (CuratorCacheImpl.this) {
-                currentChildPhaser = rootPhaser; 
-            } 
+                currentChildPhaser = rootPhaser;
+            }
             return true;
         }
     };
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
index adfc0e41..fe37d674 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestCuratorCache.java
@@ -23,11 +23,14 @@ import static org.apache.curator.framework.recipes.cache.CuratorCache.Options.DO
 import static org.apache.curator.framework.recipes.cache.CuratorCacheListener.builder;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.junit.jupiter.api.Tag;
@@ -186,7 +189,10 @@ public class TestCuratorCache extends CuratorTestBase {
         final CuratorCacheStorage storage = new StandardCuratorCacheStorage(false);
 
         // Phaser has a hard-limit of 64k registrants; we need to create more than that to trigger the initial problem.
-        final int zNodeCount = 0xffff + 5;
+        final int zNodeCount = 0xFFFF + 5;
+
+        // Bulk creations in multiOp for (1) speed up creations (2) not exceed jute.maxbuffer size.
+        final int bulkSize = 10000;
 
         try (CuratorFramework client = CuratorFrameworkFactory.newClient(
                 server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) {
@@ -194,20 +200,29 @@ public class TestCuratorCache extends CuratorTestBase {
             final CountDownLatch initializedLatch = new CountDownLatch(1);
             client.create().creatingParentsIfNeeded().forPath("/test");
 
-
+            final List<CuratorOp> creations = new ArrayList<>();
             for (int i = 0; i < zNodeCount; i++) {
-                client.create().forPath("/test/node_" + i);
+                creations.add(client.transactionOp().create().forPath("/test/node_" + i));
+                if (creations.size() > bulkSize) {
+                    client.transaction().forOperations(creations);
+                    creations.clear();
+                }
             }
+            client.transaction().forOperations(creations);
+            creations.clear();
 
-            try (CuratorCache cache = CuratorCache.builder(client, "/test").withStorage(storage).build()) {
-                cache.listenable()
-                        .addListener(builder()
-                                .forInitialized(() -> initializedLatch.countDown())
-                                .build());
+            try (CuratorCache cache =
+                    CuratorCache.builder(client, "/test").withStorage(storage).build()) {
+                final CuratorCacheListener listener =
+                        builder().forInitialized(initializedLatch::countDown).build();
+                cache.listenable().addListener(listener);
                 cache.start();
 
                 assertTrue(timing.awaitLatch(initializedLatch));
-                assertEquals(zNodeCount + 1, cache.size(), "Cache size should be equal to the number of zNodes created plus the root");
+                assertEquals(
+                        zNodeCount + 1,
+                        cache.size(),
+                        "Cache size should be equal to the number of zNodes created plus the root");
             }
         }
     }
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 76a87ad8..b66a7e52 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
 public class BaseClassForTests {
     protected volatile TestingServer server;
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
+    protected final Logger log = LoggerFactory.getLogger(getClass());
     private final AtomicBoolean isRetrying = new AtomicBoolean(false);
 
     private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;


[curator] 02/02: revert to 1038aa6d03f240fb8a8de9736adc51ccbcc9f5b6 solution

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch CURATOR-690.-CuratorCache-fails-to-load-the-cache-if-there-are-more-than-64K-child-ZNodes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 18543788669af270b51af291aac6185068896ad5
Author: tison <wa...@gmail.com>
AuthorDate: Fri Sep 15 02:05:22 2023 +0800

    revert to 1038aa6d03f240fb8a8de9736adc51ccbcc9f5b6 solution
    
    Signed-off-by: tison <wa...@gmail.com>
---
 .../framework/recipes/cache/CuratorCacheImpl.java  | 35 +++-------------
 .../framework/recipes/cache/OutstandingOps.java    | 49 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 29 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index 50760dd2..23268f5e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -30,7 +30,6 @@ import com.google.common.collect.Sets;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Phaser;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Stream;
@@ -61,28 +60,8 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
     private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
     private final Consumer<Exception> exceptionHandler;
 
-    private final Phaser rootPhaser = new Phaser() {
-        @Override
-        protected boolean onAdvance(int phase, int registeredParties) {
-            callListeners(CuratorCacheListener::initialized);
-            synchronized (CuratorCacheImpl.this) {
-                currentChildPhaser = rootPhaser;
-            }
-            return true;
-        }
-    };
-
-    private Phaser currentChildPhaser = new Phaser(rootPhaser);
-
-    private synchronized Phaser getPhaserAndRegister() {
-        if (currentChildPhaser.getRegisteredParties() >= 0xffff) {
-            currentChildPhaser = new Phaser(rootPhaser);
-        }
-
-        currentChildPhaser.register();
-
-        return currentChildPhaser;
-    }
+    private final OutstandingOps outstandingOps =
+            new OutstandingOps(() -> callListeners(CuratorCacheListener::initialized));
 
     private enum State {
         LATENT,
@@ -199,8 +178,6 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
             return; // children haven't changed
         }
 
-        final Phaser outstandingOps = getPhaserAndRegister();
-
         try {
             BackgroundCallback callback = (__, event) -> {
                 if (event.getResultCode() == OK.intValue()) {
@@ -210,9 +187,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
                 } else {
                     handleException(event);
                 }
-                outstandingOps.arriveAndDeregister();
+                outstandingOps.decrement();
             };
 
+            outstandingOps.increment();
             client.getChildren().inBackground(callback).forPath(fromPath);
         } catch (Exception e) {
             handleException(e);
@@ -224,8 +202,6 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
             return;
         }
 
-        final Phaser outstandingOps = getPhaserAndRegister();
-
         try {
             BackgroundCallback callback = (__, event) -> {
                 if (event.getResultCode() == OK.intValue()) {
@@ -238,9 +214,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
                 } else {
                     handleException(event);
                 }
-                outstandingOps.arriveAndDeregister();
+                outstandingOps.decrement();
             };
 
+            outstandingOps.increment();
             if (compressedData) {
                 client.getData().decompressed().inBackground(callback).forPath(fromPath);
             } else {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
new file mode 100644
index 00000000..81a1ac26
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+class OutstandingOps {
+    private final AtomicReference<Runnable> completionProc;
+    private final AtomicLong count = new AtomicLong(0);
+    private volatile boolean active = true;
+
+    OutstandingOps(Runnable completionProc) {
+        this.completionProc = new AtomicReference<>(completionProc);
+    }
+
+    void increment() {
+        if (active) {
+            count.incrementAndGet();
+        }
+    }
+
+    void decrement() {
+        if (active && (count.decrementAndGet() == 0)) {
+            Runnable proc = completionProc.getAndSet(null);
+            if (proc != null) {
+                active = false;
+                proc.run();
+            }
+        }
+    }
+}