You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by eo...@apache.org on 2021/10/16 11:34:57 UTC
[curator] branch master updated: Replace OutstandingOps with JDK
bundled Phaser (#365)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push:
new 1038aa6 Replace OutstandingOps with JDK bundled Phaser (#365)
1038aa6 is described below
commit 1038aa6d03f240fb8a8de9736adc51ccbcc9f5b6
Author: tison <wa...@gmail.com>
AuthorDate: Sat Oct 16 19:34:46 2021 +0800
Replace OutstandingOps with JDK bundled Phaser (#365)
---
.../framework/recipes/cache/CuratorCacheImpl.java | 18 +++++--
.../framework/recipes/cache/OutstandingOps.java | 55 ----------------------
2 files changed, 13 insertions(+), 60 deletions(-)
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index 1446204..4f31e0e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;
@@ -58,7 +59,14 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
private final boolean clearOnClose;
private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
private final Consumer<Exception> exceptionHandler;
- private final OutstandingOps outstandingOps = new OutstandingOps(() -> callListeners(CuratorCacheListener::initialized));
+
+ private final Phaser outstandingOps = new Phaser() {
+ @Override
+ protected boolean onAdvance(int phase, int registeredParties) {
+ callListeners(CuratorCacheListener::initialized);
+ return true;
+ }
+ };
private enum State
{
@@ -210,10 +218,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
{
handleException(event);
}
- outstandingOps.decrement();
+ outstandingOps.arriveAndDeregister();
};
- outstandingOps.increment();
+ outstandingOps.register();
client.getChildren().inBackground(callback).forPath(fromPath);
}
catch ( Exception e )
@@ -245,10 +253,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
{
handleException(event);
}
- outstandingOps.decrement();
+ outstandingOps.arriveAndDeregister();
};
- outstandingOps.increment();
+ outstandingOps.register();
if ( compressedData )
{
client.getData().decompressed().inBackground(callback).forPath(fromPath);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
deleted file mode 100644
index 4e7b540..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.cache;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-class OutstandingOps
-{
- private final AtomicReference<Runnable> completionProc;
- private final AtomicLong count = new AtomicLong(0);
- private volatile boolean active = true;
-
- OutstandingOps(Runnable completionProc)
- {
- this.completionProc = new AtomicReference<>(completionProc);
- }
-
- void increment()
- {
- if ( active )
- {
- count.incrementAndGet();
- }
- }
-
- void decrement()
- {
- if ( active && (count.decrementAndGet() == 0) )
- {
- Runnable proc = completionProc.getAndSet(null);
- if ( proc != null )
- {
- active = false;
- proc.run();
- }
- }
- }
-}