You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ti...@apache.org on 2023/09/14 18:07:10 UTC
[curator] 02/02: revert to 1038aa6d03f240fb8a8de9736adc51ccbcc9f5b6 solution
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch CURATOR-690.-CuratorCache-fails-to-load-the-cache-if-there-are-more-than-64K-child-ZNodes
in repository https://gitbox.apache.org/repos/asf/curator.git
commit 18543788669af270b51af291aac6185068896ad5
Author: tison <wa...@gmail.com>
AuthorDate: Fri Sep 15 02:05:22 2023 +0800
revert to 1038aa6d03f240fb8a8de9736adc51ccbcc9f5b6 solution
Signed-off-by: tison <wa...@gmail.com>
---
.../framework/recipes/cache/CuratorCacheImpl.java | 35 +++-------------
.../framework/recipes/cache/OutstandingOps.java | 49 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 29 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index 50760dd2..23268f5e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -30,7 +30,6 @@ import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;
@@ -61,28 +60,8 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
private final Consumer<Exception> exceptionHandler;
- private final Phaser rootPhaser = new Phaser() {
- @Override
- protected boolean onAdvance(int phase, int registeredParties) {
- callListeners(CuratorCacheListener::initialized);
- synchronized (CuratorCacheImpl.this) {
- currentChildPhaser = rootPhaser;
- }
- return true;
- }
- };
-
- private Phaser currentChildPhaser = new Phaser(rootPhaser);
-
- private synchronized Phaser getPhaserAndRegister() {
- if (currentChildPhaser.getRegisteredParties() >= 0xffff) {
- currentChildPhaser = new Phaser(rootPhaser);
- }
-
- currentChildPhaser.register();
-
- return currentChildPhaser;
- }
+ private final OutstandingOps outstandingOps =
+ new OutstandingOps(() -> callListeners(CuratorCacheListener::initialized));
private enum State {
LATENT,
@@ -199,8 +178,6 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
return; // children haven't changed
}
- final Phaser outstandingOps = getPhaserAndRegister();
-
try {
BackgroundCallback callback = (__, event) -> {
if (event.getResultCode() == OK.intValue()) {
@@ -210,9 +187,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
} else {
handleException(event);
}
- outstandingOps.arriveAndDeregister();
+ outstandingOps.decrement();
};
+ outstandingOps.increment();
client.getChildren().inBackground(callback).forPath(fromPath);
} catch (Exception e) {
handleException(e);
@@ -224,8 +202,6 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
return;
}
- final Phaser outstandingOps = getPhaserAndRegister();
-
try {
BackgroundCallback callback = (__, event) -> {
if (event.getResultCode() == OK.intValue()) {
@@ -238,9 +214,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge {
} else {
handleException(event);
}
- outstandingOps.arriveAndDeregister();
+ outstandingOps.decrement();
};
+ outstandingOps.increment();
if (compressedData) {
client.getData().decompressed().inBackground(callback).forPath(fromPath);
} else {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
new file mode 100644
index 00000000..81a1ac26
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.cache;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+class OutstandingOps {
+ private final AtomicReference<Runnable> completionProc;
+ private final AtomicLong count = new AtomicLong(0);
+ private volatile boolean active = true;
+
+ OutstandingOps(Runnable completionProc) {
+ this.completionProc = new AtomicReference<>(completionProc);
+ }
+
+ void increment() {
+ if (active) {
+ count.incrementAndGet();
+ }
+ }
+
+ void decrement() {
+ if (active && (count.decrementAndGet() == 0)) {
+ Runnable proc = completionProc.getAndSet(null);
+ if (proc != null) {
+ active = false;
+ proc.run();
+ }
+ }
+ }
+}