You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by dr...@apache.org on 2015/08/19 01:18:32 UTC

[16/31] curator git commit: CURATOR-161 - Modified the background processing framework to allow operations to request that a live connection is not necessary to execute (this is needed to run the remove watches with 'local' set to true. Cleaned up some u

CURATOR-161 - Modified the background processing framework to allow
operations to request that a live connection is not necessary to execute
(this is needed to run the remove watches with 'local' set to true.
Cleaned up some unit tests.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ba4da2c3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ba4da2c3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ba4da2c3

Branch: refs/heads/CURATOR-3.0
Commit: ba4da2c3c7048ea249f18e7b4c815db76f0b1ad0
Parents: 22d034a
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Thu May 14 09:19:09 2015 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Thu May 14 09:19:09 2015 +1000

----------------------------------------------------------------------
 .../framework/imps/CuratorFrameworkImpl.java    |  2 +-
 .../framework/imps/OperationAndData.java        | 16 ++++-
 .../imps/RemoveWatchesBuilderImpl.java          | 22 ++++--
 .../framework/imps/TestRemoveWatches.java       | 73 ++++++++------------
 4 files changed, 58 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index b4a1d93..c82f984 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -821,7 +821,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     {
         try
         {
-            if ( client.isConnected() )
+            if ( !operationAndData.isConnectionRequired() || client.isConnected() )
             {
                 operationAndData.callPerformBackgroundOperation();
             }

http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 38f59a0..b46cddb 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -40,25 +40,37 @@ class OperationAndData<T> implements Delayed, RetrySleeper
     private final AtomicLong sleepUntilTimeMs = new AtomicLong(0);
     private final long ordinal = nextOrdinal.getAndIncrement();
     private final Object context;
+    private final boolean connectionRequired;
 
     interface ErrorCallback<T>
     {
         void retriesExhausted(OperationAndData<T> operationAndData);
     }
-
-    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context)
+    
+    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, boolean connectionRequired)
     {
         this.operation = operation;
         this.data = data;
         this.callback = callback;
         this.errorCallback = errorCallback;
         this.context = context;
+        this.connectionRequired = connectionRequired;
+    }      
+
+    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context)
+    {
+        this(operation, data, callback, errorCallback, context, true);
     }
 
     Object getContext()
     {
         return context;
     }
+    
+    boolean isConnectionRequired()
+    {
+        return connectionRequired;
+    }
 
     void callPerformBackgroundOperation() throws Exception
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index 27d05da..932706b 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -166,15 +166,23 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
     
     private void pathInBackground(final String path)
     {
-        OperationAndData.ErrorCallback<String>  errorCallback = new OperationAndData.ErrorCallback<String>()
+        OperationAndData.ErrorCallback<String>  errorCallback = null;
+        
+        //Only need an error callback if we're in guaranteed mode
+        if(guaranteed)
         {
-            @Override
-            public void retriesExhausted(OperationAndData<String> operationAndData)
+            errorCallback = new OperationAndData.ErrorCallback<String>()
             {
-                client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher));
-            }            
-        };        
-        client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null);
+                @Override
+                public void retriesExhausted(OperationAndData<String> operationAndData)
+                {
+                    client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher));
+                }            
+            };
+        }
+        
+        client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(),
+                                                                       errorCallback, backgrounding.getContext(), !local), null);
     }
     
     private void pathInForeground(final String path) throws Exception

http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
index 518f13b..fc15f0c 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -1,8 +1,9 @@
 package org.apache.curator.framework.imps;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -24,13 +25,30 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestRemoveWatches extends BaseClassForTests
 {
+    private boolean blockUntilDesiredConnectionState(CuratorFramework client, Timing timing, final ConnectionState desiredState)
+    {
+        final CountDownLatch latch = new CountDownLatch(1);
+        client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+        {
+            
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState)
+            {
+                if(newState == desiredState)
+                {
+                    latch.countDown();
+                }
+            }
+        });
+        
+        return timing.awaitLatch(latch);
+    }
+    
     @Test
     public void testRemoveCuratorDefaultWatcher() throws Exception
     {
@@ -330,7 +348,7 @@ public class TestRemoveWatches extends BaseClassForTests
             //Stop the server so we can check if we can remove watches locally when offline
             server.stop();
             
-            timing.sleepABit();
+            blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED);
                        
             client.watches().removeAll().locally().forPath(path);
             
@@ -364,7 +382,7 @@ public class TestRemoveWatches extends BaseClassForTests
             //Stop the server so we can check if we can remove watches locally when offline
             server.stop();
             
-            timing.sleepABit();
+            blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED);
                        
             client.watches().removeAll().locally().inBackground().forPath(path);
             
@@ -452,25 +470,7 @@ public class TestRemoveWatches extends BaseClassForTests
         try
         {
             client.start();
-            
-            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
-            final CountDownLatch suspendedLatch = new CountDownLatch(1);
-            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
-            {
-                @Override
-                public void stateChanged(CuratorFramework client, ConnectionState newState)
-                {
-                    if(newState == ConnectionState.SUSPENDED)
-                    {
-                        suspendedLatch.countDown();
-                    }
-                    else if(newState == ConnectionState.RECONNECTED)
-                    {
-                        reconnectedLatch.countDown();
-                    }
-                }
-            });
-            
+                       
             String path = "/";
             
             CountDownLatch removeLatch = new CountDownLatch(1);
@@ -479,7 +479,8 @@ public class TestRemoveWatches extends BaseClassForTests
             client.checkExists().usingWatcher(watcher).forPath(path);
             
             server.stop();           
-            timing.awaitLatch(suspendedLatch);
+            
+            blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED);
             
             //Remove the watch while we're not connected
             try 
@@ -510,25 +511,7 @@ public class TestRemoveWatches extends BaseClassForTests
         try
         {
             client.start();
-            
-            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
-            final CountDownLatch suspendedLatch = new CountDownLatch(1);
-            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
-            {
-                @Override
-                public void stateChanged(CuratorFramework client, ConnectionState newState)
-                {
-                    if(newState == ConnectionState.SUSPENDED)
-                    {
-                        suspendedLatch.countDown();
-                    }
-                    else if(newState == ConnectionState.RECONNECTED)
-                    {
-                        reconnectedLatch.countDown();
-                    }
-                }
-            });
-            
+                        
             final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1);
             
             ((CuratorFrameworkImpl)client).getFailedRemoveWatcherManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<FailedRemoveWatchManager.FailedRemoveWatchDetails>()
@@ -550,7 +533,7 @@ public class TestRemoveWatches extends BaseClassForTests
             client.checkExists().usingWatcher(watcher).forPath(path);
             
             server.stop();           
-            timing.awaitLatch(suspendedLatch);
+            blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED);
             
             //Remove the watch while we're not connected
             client.watches().remove(watcher).guaranteed().inBackground().forPath(path);