You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2020/03/28 00:07:39 UTC

[curator] branch CURATOR-549-zk36-persistent-watcher-recipes updated: x

This is an automated email from the ASF dual-hosted git repository.

randgalt pushed a commit to branch CURATOR-549-zk36-persistent-watcher-recipes
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/CURATOR-549-zk36-persistent-watcher-recipes by this push:
     new 7f8c525  x
7f8c525 is described below

commit 7f8c525335d2d035a9ebbc684716a602ef34f1b1
Author: randgalt <ra...@apache.org>
AuthorDate: Fri Mar 27 19:07:32 2020 -0500

    x
---
 .../framework/recipes/cache/OutstandingOps.java    | 29 +++++++++-------------
 1 file changed, 12 insertions(+), 17 deletions(-)

diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
index d1b0cbe..4e7b540 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java
@@ -19,41 +19,36 @@
 package org.apache.curator.framework.recipes.cache;
 
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 class OutstandingOps
 {
-    private final Runnable completionProc;
-    private volatile AtomicLong count = new AtomicLong(0);
+    private final AtomicReference<Runnable> completionProc;
+    private final AtomicLong count = new AtomicLong(0);
+    private volatile boolean active = true;
 
     OutstandingOps(Runnable completionProc)
     {
-        this.completionProc = completionProc;
+        this.completionProc = new AtomicReference<>(completionProc);
     }
 
     void increment()
     {
-        AtomicLong localCount = count;
-        if ( localCount != null )
+        if ( active )
         {
-            localCount.incrementAndGet();
+            count.incrementAndGet();
         }
     }
 
     void decrement()
     {
-        AtomicLong localCount = count;
-        if ( localCount != null )
+        if ( active && (count.decrementAndGet() == 0) )
         {
-            if ( (localCount.decrementAndGet() == 0) )
+            Runnable proc = completionProc.getAndSet(null);
+            if ( proc != null )
             {
-                count = null;
-                if ( localCount.compareAndSet(0, Long.MIN_VALUE) )
-                {
-                    // use Long.MIN_VALUE as a sentinel to avoid any races with the count.
-                    // Only 1 thread will successfully set count to Long.MIN_VALUE and
-                    // thus completionProc will only get called once
-                    completionProc.run();
-                }
+                active = false;
+                proc.run();
             }
         }
     }