You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by eo...@apache.org on 2021/10/16 11:34:57 UTC

[curator] branch master updated: Replace OutstandingOps with JDK bundled Phaser (#365)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new 1038aa6  Replace OutstandingOps with JDK bundled Phaser (#365)
1038aa6 is described below

commit 1038aa6d03f240fb8a8de9736adc51ccbcc9f5b6
Author: tison <wa...@gmail.com>
AuthorDate: Sat Oct 16 19:34:46 2021 +0800

    Replace OutstandingOps with JDK bundled Phaser (#365)
---
 .../framework/recipes/cache/CuratorCacheImpl.java  | 18 +++++--
 .../framework/recipes/cache/OutstandingOps.java    | 55 ----------------------
 2 files changed, 13 insertions(+), 60 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
index 1446204..4f31e0e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Stream;
@@ -58,7 +59,14 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
     private final boolean clearOnClose;
     private final StandardListenerManager<CuratorCacheListener> listenerManager = StandardListenerManager.standard();
     private final Consumer<Exception> exceptionHandler;
-    private final OutstandingOps outstandingOps = new OutstandingOps(() -> callListeners(CuratorCacheListener::initialized));
+
+    private final Phaser outstandingOps = new Phaser() {
+        @Override
+        protected boolean onAdvance(int phase, int registeredParties) {
+            callListeners(CuratorCacheListener::initialized);
+            return true;
+        }
+    };
 
     private enum State
     {
@@ -210,10 +218,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
                 {
                     handleException(event);
                 }
-                outstandingOps.decrement();
+                outstandingOps.arriveAndDeregister();
             };
 
-            outstandingOps.increment();
+            outstandingOps.register();
             client.getChildren().inBackground(callback).forPath(fromPath);
         }
         catch ( Exception e )
@@ -245,10 +253,10 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge
                 {
                     handleException(event);
                 }
-                outstandingOps.decrement();
+                outstandingOps.arriveAndDeregister();
             };
 
-            outstandingOps.increment();
+            outstandingOps.register();
             if ( compressedData )
             {
                 client.getData().decompressed().inBackground(callback).forPath(fromPath);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
deleted file mode 100644
index 4e7b540..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.cache;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-class OutstandingOps
-{
-    private final AtomicReference<Runnable> completionProc;
-    private final AtomicLong count = new AtomicLong(0);
-    private volatile boolean active = true;
-
-    OutstandingOps(Runnable completionProc)
-    {
-        this.completionProc = new AtomicReference<>(completionProc);
-    }
-
-    void increment()
-    {
-        if ( active )
-        {
-            count.incrementAndGet();
-        }
-    }
-
-    void decrement()
-    {
-        if ( active && (count.decrementAndGet() == 0) )
-        {
-            Runnable proc = completionProc.getAndSet(null);
-            if ( proc != null )
-            {
-                active = false;
-                proc.run();
-            }
-        }
-    }
-}