You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/01/18 23:53:39 UTC

curator git commit: Make sure NamespaceWatcherMap is cleared when the corresponding watcher is removed via new APIs. Added tests to ensure this.

Repository: curator
Updated Branches:
  refs/heads/CURATOR-290 [created] adb4be47c


Make sure NamespaceWatcherMap is cleared when the corresponding watcher is removed via new APIs. Added tests to ensure this.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/adb4be47
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/adb4be47
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/adb4be47

Branch: refs/heads/CURATOR-290
Commit: adb4be47ca6e64962aae3067412cc76aa4b0cd22
Parents: ae8dc46
Author: randgalt <ra...@apache.org>
Authored: Mon Jan 18 17:53:27 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jan 18 17:53:27 2016 -0500

----------------------------------------------------------------------
 .../org/apache/curator/utils/DebugUtils.java    |  1 +
 .../framework/imps/NamespaceWatcherMap.java     | 11 ++++
 .../imps/RemoveWatchesBuilderImpl.java          | 10 ++-
 .../framework/imps/WatcherRemovalFacade.java    | 11 +++-
 .../framework/imps/WatcherRemovalManager.java   |  5 +-
 .../framework/imps/TestRemoveWatches.java       | 67 ++++++++++++--------
 .../framework/recipes/cache/TreeCache.java      |  2 +-
 .../apache/curator/test/BaseClassForTests.java  | 14 ++++
 8 files changed, 88 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java b/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java
index 03f6903..beea726 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java
@@ -25,6 +25,7 @@ public class DebugUtils
     public static final String PROPERTY_DONT_LOG_CONNECTION_ISSUES = "curator-dont-log-connection-problems";
     public static final String PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL = "curator-log-only-first-connection-issue-as-error-level";
     public static final String PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND = "curator-remove-watchers-in-foreground";
+    public static final String PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY = "curator-validate-namespace-watcher-map-empty";
 
     private DebugUtils()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
index e5aecb2..00618e6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java
@@ -42,6 +42,11 @@ class NamespaceWatcherMap implements Closeable
     @Override
     public void close()
     {
+        clear();
+    }
+
+    void clear()
+    {
         map.clear();
     }
 
@@ -71,6 +76,12 @@ class NamespaceWatcherMap implements Closeable
         return map.remove(key);
     }
 
+    boolean removeWatcher(Object watcher)
+    {
+        //noinspection SuspiciousMethodCalls
+        return map.values().remove(watcher);
+    }
+
     @VisibleForTesting
     boolean isEmpty()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index f2666e6..c1772f1 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -231,10 +231,12 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
             ZooKeeper zkClient = client.getZooKeeper();
             if(watcher == null)
             {
+                client.getNamespaceWatcherMap().clear();
                 zkClient.removeAllWatches(path, watcherType, local);    
             }
             else
             {
+                client.getNamespaceWatcherMap().removeWatcher(watcher);
                 zkClient.removeWatches(path, watcher, watcherType, local);
             }
         }
@@ -252,10 +254,12 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
                                 
                                 if(watcher == null)
                                 {
-                                    zkClient.removeAllWatches(path, watcherType, local);    
+                                    client.getNamespaceWatcherMap().clear();
+                                    zkClient.removeAllWatches(path, watcherType, local);
                                 }
                                 else
                                 {
+                                    client.getNamespaceWatcherMap().removeWatcher(watcher);
                                     zkClient.removeWatches(path, watcher, watcherType, local);
                                 }
                             }
@@ -304,10 +308,12 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat
         ZooKeeper zkClient = client.getZooKeeper();
         if(watcher == null)
         {
-            zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext());    
+            client.getNamespaceWatcherMap().clear();
+            zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext());
         }
         else
         {
+            client.getNamespaceWatcherMap().removeWatcher(watcher);
             zkClient.removeWatches(operationAndData.getData(), watcher, watcherType, local, callback, operationAndData.getContext());
         }
         

http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
index 371fc63..91530b4 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.api.CuratorListener;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.DebugUtils;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
@@ -41,7 +42,7 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove
     {
         super(client);
         this.client = client;
-        removalManager = new WatcherRemovalManager(client);
+        removalManager = new WatcherRemovalManager(client, getNamespaceWatcherMap());
     }
 
     @Override
@@ -65,6 +66,14 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove
     public void removeWatchers()
     {
         removalManager.removeWatchers();
+
+        if ( Boolean.getBoolean(DebugUtils.PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY) )
+        {
+            if ( !getNamespaceWatcherMap().isEmpty() )
+            {
+                throw new RuntimeException("NamespaceWatcherMap is not empty: " + client.getNamespaceWatcherMap());
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
index a691a94..064964d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
@@ -32,11 +32,13 @@ public class WatcherRemovalManager
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFrameworkImpl client;
+    private final NamespaceWatcherMap namespaceWatcherMap;
     private final Set<WrappedWatcher> entries = Sets.newHashSet();  // guarded by sync
 
-    WatcherRemovalManager(CuratorFrameworkImpl client)
+    WatcherRemovalManager(CuratorFrameworkImpl client, NamespaceWatcherMap namespaceWatcherMap)
     {
         this.client = client;
+        this.namespaceWatcherMap = namespaceWatcherMap;
     }
 
     synchronized Watcher add(String path, Watcher watcher)
@@ -67,6 +69,7 @@ public class WatcherRemovalManager
             try
             {
                 log.debug("Removing watcher for path: " + entry.path);
+                namespaceWatcherMap.removeWatcher(entry.watcher);
                 RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client);
                 builder.internalRemoval(entry, entry.path);
             }

http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
index 4e02e95..a7c137a 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -18,12 +18,6 @@
  */
 package org.apache.curator.framework.imps;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
@@ -46,6 +40,9 @@ import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class TestRemoveWatches extends BaseClassForTests
 {
@@ -75,7 +72,8 @@ public class TestRemoveWatches extends BaseClassForTests
         {
             return true;
         }
-        
+
+        //noinspection SynchronizationOnLocalVariableOrMethodParameter
         synchronized(stateRef)
         {
             if(stateRef.get() == desiredState)
@@ -139,7 +137,7 @@ public class TestRemoveWatches extends BaseClassForTests
     public void testRemoveCuratorWatch() throws Exception
     {       
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.builder().
+        CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
                 connectString(server.getConnectString()).
                 retryPolicy(new RetryOneTime(1)).
                 build();
@@ -163,9 +161,11 @@ public class TestRemoveWatches extends BaseClassForTests
             };
                         
             client.checkExists().usingWatcher(watcher).forPath(path);
+            Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty());
             
             client.watches().remove(watcher).forPath(path);
-            
+            Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty());
+
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
         }
         finally
@@ -178,7 +178,7 @@ public class TestRemoveWatches extends BaseClassForTests
     public void testRemoveWatch() throws Exception
     {       
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.builder().
+        CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
                 connectString(server.getConnectString()).
                 retryPolicy(new RetryOneTime(1)).
                 build();
@@ -192,9 +192,11 @@ public class TestRemoveWatches extends BaseClassForTests
             Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
             
             client.checkExists().usingWatcher(watcher).forPath(path);
-            
+            Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty());
+
             client.watches().remove(watcher).forPath(path);
-            
+            Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty());
+
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
         }
         finally
@@ -207,7 +209,7 @@ public class TestRemoveWatches extends BaseClassForTests
     public void testRemoveWatchInBackgroundWithCallback() throws Exception
     {       
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.builder().
+        CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
                 connectString(server.getConnectString()).
                 retryPolicy(new RetryOneTime(1)).
                 build();
@@ -233,11 +235,12 @@ public class TestRemoveWatches extends BaseClassForTests
                 }
             };
             
-            
             client.checkExists().usingWatcher(watcher).forPath(path);
-            
+            Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty());
+
             client.watches().remove(watcher).ofType(WatcherType.Any).inBackground(callback).forPath(path);
-            
+            Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty());
+
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
             
         }
@@ -251,7 +254,7 @@ public class TestRemoveWatches extends BaseClassForTests
     public void testRemoveWatchInBackgroundWithNoCallback() throws Exception
     {       
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.builder().
+        CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
                 connectString(server.getConnectString()).
                 retryPolicy(new RetryOneTime(1)).
                 build();
@@ -264,9 +267,11 @@ public class TestRemoveWatches extends BaseClassForTests
             Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
             
             client.checkExists().usingWatcher(watcher).forPath(path);
-            
+            Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty());
+
             client.watches().remove(watcher).inBackground().forPath(path);
-            
+            Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty());
+
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
             
         }
@@ -280,7 +285,7 @@ public class TestRemoveWatches extends BaseClassForTests
     public void testRemoveAllWatches() throws Exception
     {       
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.builder().
+        CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
                 connectString(server.getConnectString()).
                 retryPolicy(new RetryOneTime(1)).
                 build();
@@ -296,9 +301,11 @@ public class TestRemoveWatches extends BaseClassForTests
             
             client.getChildren().usingWatcher(watcher1).forPath(path);
             client.checkExists().usingWatcher(watcher2).forPath(path);
-            
+            Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty());
+
             client.watches().removeAll().forPath(path);
-            
+            Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty());
+
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
         }
         finally
@@ -376,7 +383,7 @@ public class TestRemoveWatches extends BaseClassForTests
     @Test
     public void testRemoveLocalWatch() throws Exception {
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.builder().
+        CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
                 connectString(server.getConnectString()).
                 retryPolicy(new RetryOneTime(1)).
                 build();
@@ -393,14 +400,16 @@ public class TestRemoveWatches extends BaseClassForTests
             Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);        
             
             client.checkExists().usingWatcher(watcher).forPath(path);
-            
+            Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty());
+
             //Stop the server so we can check if we can remove watches locally when offline
             server.stop();
             
             Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
                        
             client.watches().removeAll().locally().forPath(path);
-            
+            Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty());
+
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
         }
         finally
@@ -412,7 +421,7 @@ public class TestRemoveWatches extends BaseClassForTests
     @Test
     public void testRemoveLocalWatchInBackground() throws Exception {
         Timing timing = new Timing();
-        CuratorFramework client = CuratorFrameworkFactory.builder().
+        CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.builder().
                 connectString(server.getConnectString()).
                 retryPolicy(new RetryOneTime(1)).
                 build();
@@ -429,14 +438,16 @@ public class TestRemoveWatches extends BaseClassForTests
             Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);        
             
             client.checkExists().usingWatcher(watcher).forPath(path);
-            
+            Assert.assertTrue(!client.getNamespaceWatcherMap().isEmpty());
+
             //Stop the server so we can check if we can remove watches locally when offline
             server.stop();
             
             Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
                        
             client.watches().removeAll().locally().inBackground().forPath(path);
-            
+            Assert.assertTrue(client.getNamespaceWatcherMap().isEmpty());
+
             Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
         }
         finally

http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index 81590f7..a2f0e86 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -305,7 +305,7 @@ public class TreeCache implements Closeable
         {
             Stat oldStat = stat.getAndSet(null);
             byte[] oldData = data.getAndSet(null);
-            client.watches().remove(this).ofType(WatcherType.Any).inBackground().forPath(path);
+            client.watches().remove(this).ofType(WatcherType.Any).locally().inBackground().forPath(path);
 
             ConcurrentMap<String, TreeNode> childMap = children.getAndSet(null);
             if ( childMap != null )

http://git-wip-us.apache.org/repos/asf/curator/blob/adb4be47/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
----------------------------------------------------------------------
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index da1607c..a5afaf2 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -41,6 +41,7 @@ public class BaseClassForTests
     private static final int RETRY_WAIT_MS = 5000;
     private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;
     private static final String INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND;
+    private static final String INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY;
 
     static
     {
@@ -67,6 +68,17 @@ public class BaseClassForTests
             e.printStackTrace();
         }
         INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND = s;
+        s = null;
+        try
+        {
+            // use reflection to avoid adding a circular dependency in the pom
+            s = (String)Class.forName("org.apache.curator.utils.DebugUtils").getField("PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY").get(null);
+        }
+        catch ( Exception e )
+        {
+            e.printStackTrace();
+        }
+        INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY = s;
     }
 
     @BeforeSuite(alwaysRun = true)
@@ -107,6 +119,7 @@ public class BaseClassForTests
             System.setProperty(INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES, "true");
         }
         System.setProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true");
+        System.setProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY, "true");
 
         while ( server == null )
         {
@@ -125,6 +138,7 @@ public class BaseClassForTests
     @AfterMethod
     public void teardown() throws Exception
     {
+        System.clearProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY);
         System.clearProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND);
         if ( server != null )
         {