You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ti...@apache.org on 2023/09/14 18:07:10 UTC

[curator] 02/02: revert to 1038aa6d03f240fb8a8de9736adc51ccbcc9f5b6 solution

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch CURATOR-690.-CuratorCache-fails-to-load-the-cache-if-there-are-more-than-64K-child-ZNodes
in repository https://gitbox.apache.org/repos/asf/curator.git

commit 18543788669af270b51af291aac6185068896ad5
Author: tison <wa...@gmail.com>
AuthorDate: Fri Sep 15 02:05:22 2023 +0800

    revert to 1038aa6d03f240fb8a8de9736adc51ccbcc9f5b6 solution
    
    Signed-off-by: tison <wa...@gmail.com>
---
 .../framework/recipes/cache/CuratorCacheImpl.java  | 35 +++-------------
 .../framework/recipes/cache/OutstandingOps.java    | 49 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 29 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index 50760dd2..23268f5e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -30,7 +30,6 @@ import com.google.common.collect.Sets;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Phaser;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Stream;
@@ -61,28 +60,8 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
     private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
     private final Consumer<Exception> exceptionHandler;
 
-    private final Phaser rootPhaser = new Phaser() {
-        @Override
-        protected boolean onAdvance(int phase, int registeredParties) {
-            callListeners(CuratorCacheListener::initialized);
-            synchronized (CuratorCacheImpl.this) {
-                currentChildPhaser = rootPhaser;
-            }
-            return true;
-        }
-    };
-
-    private Phaser currentChildPhaser = new Phaser(rootPhaser);
-
-    private synchronized Phaser getPhaserAndRegister() {
-        if (currentChildPhaser.getRegisteredParties() >= 0xffff) {
-            currentChildPhaser = new Phaser(rootPhaser);
-        }
-
-        currentChildPhaser.register();
-
-        return currentChildPhaser;
-    }
+    private final OutstandingOps outstandingOps =
+            new OutstandingOps(() -> callListeners(CuratorCacheListener::initialized));
 
     private enum State {
         LATENT,
@@ -199,8 +178,6 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
             return; // children haven't changed
         }
 
-        final Phaser outstandingOps = getPhaserAndRegister();
-
         try {
             BackgroundCallback callback = (__, event) -> {
                 if (event.getResultCode() == OK.intValue()) {
@@ -210,9 +187,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
                 } else {
                     handleException(event);
                 }
-                outstandingOps.arriveAndDeregister();
+                outstandingOps.decrement();
             };
 
+            outstandingOps.increment();
             client.getChildren().inBackground(callback).forPath(fromPath);
         } catch (Exception e) {
             handleException(e);
@@ -224,8 +202,6 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
             return;
         }
 
-        final Phaser outstandingOps = getPhaserAndRegister();
-
         try {
             BackgroundCallback callback = (__, event) -> {
                 if (event.getResultCode() == OK.intValue()) {
@@ -238,9 +214,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
                 } else {
                     handleException(event);
                 }
-                outstandingOps.arriveAndDeregister();
+                outstandingOps.decrement();
             };
 
+            outstandingOps.increment();
             if (compressedData) {
                 client.getData().decompressed().inBackground(callback).forPath(fromPath);
             } else {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
new file mode 100644
index 00000000..81a1ac26
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+class OutstandingOps {
+    private final AtomicReference<Runnable> completionProc;
+    private final AtomicLong count = new AtomicLong(0);
+    private volatile boolean active = true;
+
+    OutstandingOps(Runnable completionProc) {
+        this.completionProc = new AtomicReference<>(completionProc);
+    }
+
+    void increment() {
+        if (active) {
+            count.incrementAndGet();
+        }
+    }
+
+    void decrement() {
+        if (active && (count.decrementAndGet() == 0)) {
+            Runnable proc = completionProc.getAndSet(null);
+            if (proc != null) {
+                active = false;
+                proc.run();
+            }
+        }
+    }
+}