You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/12/31 21:41:19 UTC

[1/5] curator git commit: refactoring

Repository: curator
Updated Branches:
  refs/heads/persistent-watch 076583d14 -> 38c766310


refactoring


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5b0a9f56
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5b0a9f56
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5b0a9f56

Branch: refs/heads/persistent-watch
Commit: 5b0a9f56e7d050eedfea0618f90c58d718441d3f
Parents: 076583d
Author: randgalt <ra...@apache.org>
Authored: Fri Dec 30 14:09:53 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Dec 30 14:09:53 2016 -0500

----------------------------------------------------------------------
 .../framework/recipes/watch/CacheAction.java    |  5 +-
 .../framework/recipes/watch/CacheFilter.java    |  2 +-
 .../framework/recipes/watch/CacheFilters.java   | 45 +++++++++++++++
 .../recipes/watch/CuratorCacheBuilder.java      | 23 ++++----
 .../recipes/watch/InternalCuratorCache.java     | 47 +++++++++++++---
 .../recipes/watch/InternalNodeCache.java        |  8 +--
 .../recipes/watch/NoDataCacheFilter.java        |  4 +-
 .../framework/recipes/watch/RefreshFilter.java  |  2 +-
 .../framework/recipes/watch/RefreshFilters.java | 36 ++++++++++++
 .../recipes/watch/SingleLevelCacheFilter.java   | 59 --------------------
 .../recipes/watch/SingleLevelRefreshFilter.java | 37 ------------
 .../recipes/watch/StandardCacheFilter.java      | 46 +++++++++++++++
 .../recipes/watch/StatsOnlyCacheFilter.java     | 28 ----------
 .../recipes/watch/TreeRefreshFilter.java        | 28 ----------
 .../watch/TestSingleLevelCuratorCache.java      |  5 +-
 15 files changed, 190 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
index a59fe99..b39457a 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheAction.java
@@ -22,6 +22,7 @@ public enum CacheAction
 {
     NOT_STORED,
     PATH_ONLY,
-    PATH_AND_DATA,
-    PATH_AND_COMPRESSED_DATA
+    STAT_ONLY,
+    STAT_AND_DATA,
+    STAT_AND_COMPRESSED_DATA
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
index 9923174..6cccc88 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilter.java
@@ -20,5 +20,5 @@ package org.apache.curator.framework.recipes.watch;
 
 public interface CacheFilter
 {
-    CacheAction actionForPath(String path);
+    CacheAction actionForPath(String mainPath, String checkPath);
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
new file mode 100644
index 0000000..171dea4
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
@@ -0,0 +1,45 @@
+package org.apache.curator.framework.recipes.watch;
+
+public class CacheFilters
+{
+    private static final CacheFilter statAndData = new StandardCacheFilter(CacheAction.STAT_AND_DATA);
+    private static final CacheFilter compressedStatAndData = new StandardCacheFilter(CacheAction.STAT_AND_COMPRESSED_DATA);
+    private static final CacheFilter statOnly = new StandardCacheFilter(CacheAction.STAT_ONLY);
+    private static final CacheFilter pathOnly = new StandardCacheFilter(CacheAction.PATH_ONLY);
+
+    public static CacheFilter statAndData()
+    {
+        return statAndData;
+    }
+
+    public static CacheFilter compressedData()
+    {
+        return compressedStatAndData;
+    }
+
+    public static CacheFilter statOnly()
+    {
+        return statOnly;
+    }
+
+    public static CacheFilter pathOnly()
+    {
+        return pathOnly;
+    }
+
+    public static CacheFilter full(final CacheAction cacheAction)
+    {
+        return new CacheFilter()
+        {
+            @Override
+            public CacheAction actionForPath(String mainPath, String checkPath)
+            {
+                return cacheAction;
+            }
+        };
+    }
+
+    private CacheFilters()
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
index e22aa1c..341ac23 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.recipes.watch;
 
+import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import org.apache.curator.framework.CuratorFramework;
 import java.util.Objects;
@@ -28,18 +29,11 @@ public class CuratorCacheBuilder
     private final CuratorFramework client;
     private final String path;
     private CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
-    private boolean singleNode;
-    private RefreshFilter refreshFilter;
+    private boolean singleNode = false;
+    private RefreshFilter refreshFilter = null;
     private boolean sendRefreshEvents = true;
     private boolean refreshOnStart = true;
-    private CacheFilter cacheFilter = new CacheFilter()
-    {
-        @Override
-        public CacheAction actionForPath(String path)
-        {
-            return CacheAction.PATH_AND_DATA;
-        }
-    };
+    private CacheFilter cacheFilter = CacheFilters.statAndData();
 
     public static CuratorCacheBuilder builder(CuratorFramework client, String path)
     {
@@ -51,6 +45,7 @@ public class CuratorCacheBuilder
     {
         if ( singleNode )
         {
+            Preconditions.checkState(refreshFilter == null, "Single node caches do not use RefreshFilters");
             return new InternalNodeCache(client, path, cacheFilter, cacheBuilder.<String, CachedNode>build(), sendRefreshEvents, refreshOnStart);
         }
         return new InternalCuratorCache(client, path, cacheFilter, refreshFilter, cacheBuilder.<String, CachedNode>build(), sendRefreshEvents, refreshOnStart);
@@ -60,21 +55,23 @@ public class CuratorCacheBuilder
     {
         singleNode = true;
         refreshFilter = null;
+        cacheFilter = CacheFilters.statAndData();
         return this;
     }
 
     public CuratorCacheBuilder forSingleLevel()
     {
         singleNode = false;
-        refreshFilter = new SingleLevelRefreshFilter(path);
-        cacheFilter = new SingleLevelCacheFilter(path);
+        refreshFilter = RefreshFilters.singleLevel();
+        cacheFilter = CacheFilters.statAndData();
         return this;
     }
 
     public CuratorCacheBuilder forTree()
     {
         singleNode = false;
-        refreshFilter = new TreeRefreshFilter();
+        refreshFilter = RefreshFilters.tree();
+        cacheFilter = CacheFilters.statAndData();
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index aa47363..303ebb1 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -46,7 +46,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
     private static final RefreshFilter nopRefreshFilter = new RefreshFilter()
     {
         @Override
-        public boolean descend(String path)
+        public boolean descend(String mainPath, String checkPath)
         {
             return false;
         }
@@ -157,7 +157,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
                     {
                         CacheAction cacheAction = (CacheAction)event.getContext();
                         CachedNode newNode = new CachedNode(event.getStat(), event.getData());
-                        CachedNode oldNode = cache.asMap().put(path, (cacheAction == CacheAction.PATH_ONLY) ? new CachedNode(event.getStat()) : newNode);
+                        CachedNode oldNode = putNewNode(path, cacheAction, newNode);
                         if ( oldNode == null )
                         {
                             notifyListeners(CacheEvent.NODE_CREATED, path, newNode);
@@ -187,7 +187,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
             }
         };
 
-        CacheAction cacheAction = cacheFilter.actionForPath(path);
+        CacheAction cacheAction = cacheFilter.actionForPath(basePath, path);
         switch ( cacheAction )
         {
             case NOT_STORED:
@@ -196,8 +196,8 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
                 break;
             }
 
-            case PATH_ONLY:
-            case PATH_AND_DATA:
+            case STAT_ONLY:
+            case STAT_AND_DATA:
             {
                 try
                 {
@@ -212,7 +212,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
                 break;
             }
 
-            case PATH_AND_COMPRESSED_DATA:
+            case STAT_AND_COMPRESSED_DATA:
             {
                 try
                 {
@@ -228,7 +228,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
             }
         }
 
-        if ( refreshFilter.descend(path) )
+        if ( refreshFilter.descend(basePath, path) )
         {
             refresher.increment();
             try
@@ -243,6 +243,39 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
         }
     }
 
+    private CachedNode putNewNode(String path, CacheAction cacheAction, CachedNode newNode)
+    {
+        CachedNode putNode;
+        switch ( cacheAction )
+        {
+            default:
+            case NOT_STORED:
+            {
+                throw new IllegalStateException(String.format("Should not be here with action %s for path %s", cacheAction, path));
+            }
+
+            case PATH_ONLY:
+            {
+                putNode = nullNode;
+                break;
+            }
+
+            case STAT_ONLY:
+            {
+                putNode = new CachedNode(newNode.getStat());
+                break;
+            }
+
+            case STAT_AND_DATA:
+            case STAT_AND_COMPRESSED_DATA:
+            {
+                putNode = newNode;
+                break;
+            }
+        }
+        return cache.asMap().put(path, putNode);
+    }
+
     private void decrementOutstanding(SettableFuture<Boolean> task, AtomicInteger outstandingCount)
     {
         if ( outstandingCount.decrementAndGet() <= 0 )

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
index b78480a..b4a7b16 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
@@ -179,7 +179,7 @@ class InternalNodeCache extends CuratorCacheBase
                 }
                 else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                 {
-                    switch ( cacheFilter.actionForPath(path) )
+                    switch ( cacheFilter.actionForPath(path, path) )
                     {
                         default:
                         case NOT_STORED:
@@ -187,20 +187,20 @@ class InternalNodeCache extends CuratorCacheBase
                             throw new UnsupportedOperationException("Single node cache does not support action: IGNORE");
                         }
 
-                        case PATH_ONLY:
+                        case STAT_ONLY:
                         {
                             setNewData(nullNode);
                             break;
                         }
 
-                        case PATH_AND_DATA:
+                        case STAT_AND_DATA:
                         {
                             refresher.increment();
                             client.getData().usingWatcher(watcher).inBackground(backgroundCallback, refresher).forPath(path);
                             break;
                         }
 
-                        case PATH_AND_COMPRESSED_DATA:
+                        case STAT_AND_COMPRESSED_DATA:
                         {
                             refresher.increment();
                             client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback, refresher).forPath(path);

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NoDataCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NoDataCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NoDataCacheFilter.java
index b68402d..97e86da 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NoDataCacheFilter.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NoDataCacheFilter.java
@@ -21,8 +21,8 @@ package org.apache.curator.framework.recipes.watch;
 public class NoDataCacheFilter implements CacheFilter
 {
     @Override
-    public CacheAction actionForPath(String path)
+    public CacheAction actionForPath(String mainPath, String checkPath)
     {
-        return CacheAction.PATH_ONLY;
+        return CacheAction.STAT_ONLY;
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilter.java
index 5b7fa7f..8a29826 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilter.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilter.java
@@ -20,5 +20,5 @@ package org.apache.curator.framework.recipes.watch;
 
 public interface RefreshFilter
 {
-    boolean descend(String path);
+    boolean descend(String mainPath, String checkPath);
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
new file mode 100644
index 0000000..c1581c4
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
@@ -0,0 +1,36 @@
+package org.apache.curator.framework.recipes.watch;
+
+public class RefreshFilters
+{
+    private static final RefreshFilter singleLevel = new RefreshFilter()
+    {
+        @Override
+        public boolean descend(String mainPath, String checkPath)
+        {
+            return mainPath.equals(checkPath);
+        }
+    };
+
+    private static final RefreshFilter tree = new RefreshFilter()
+    {
+        @Override
+        public boolean descend(String mainPath, String checkPath)
+        {
+            return true;
+        }
+    };
+
+    public static RefreshFilter singleLevel()
+    {
+        return singleLevel;
+    }
+
+    public static RefreshFilter tree()
+    {
+        return tree;
+    }
+
+    private RefreshFilters()
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java
deleted file mode 100644
index 615a292..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelCacheFilter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.watch;
-
-import org.apache.curator.utils.ZKPaths;
-
-public class SingleLevelCacheFilter implements CacheFilter
-{
-    private final String levelPath;
-    private final CacheAction defaultAction;
-    private final boolean isRoot;
-
-    public SingleLevelCacheFilter(String levelPath)
-    {
-        this(levelPath, CacheAction.PATH_AND_DATA);
-    }
-
-    public SingleLevelCacheFilter(String levelPath, CacheAction defaultAction)
-    {
-        this.levelPath = levelPath;
-        this.defaultAction = defaultAction;
-        isRoot = levelPath.equals(ZKPaths.PATH_SEPARATOR);
-    }
-
-    @Override
-    public CacheAction actionForPath(String path)
-    {
-        if ( isRoot && path.equals(ZKPaths.PATH_SEPARATOR) )    // special case. The parent of "/" is "/"
-        {
-            return CacheAction.NOT_STORED;
-        }
-        else if ( ZKPaths.getPathAndNode(path).getPath().equals(levelPath) )
-        {
-            return actionForMatchedPath();
-        }
-        return CacheAction.NOT_STORED;
-    }
-
-    protected CacheAction actionForMatchedPath()
-    {
-        return defaultAction;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelRefreshFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelRefreshFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelRefreshFilter.java
deleted file mode 100644
index f023a3e..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/SingleLevelRefreshFilter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.watch;
-
-import java.util.Objects;
-
-public class SingleLevelRefreshFilter implements RefreshFilter
-{
-    private final String basePath;
-
-    public SingleLevelRefreshFilter(String basePath)
-    {
-        this.basePath = Objects.requireNonNull(basePath, "basePath cannot be null");
-    }
-
-    @Override
-    public boolean descend(String path)
-    {
-        return basePath.equals(path);
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StandardCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StandardCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StandardCacheFilter.java
new file mode 100644
index 0000000..e27355d
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StandardCacheFilter.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.utils.ZKPaths;
+
+class StandardCacheFilter implements CacheFilter
+{
+    private final CacheAction cacheAction;
+
+    StandardCacheFilter(CacheAction cacheAction)
+    {
+        this.cacheAction = cacheAction;
+    }
+
+    @Override
+    public CacheAction actionForPath(String mainPath, String checkPath)
+    {
+        boolean mainPathIsRoot = mainPath.endsWith(ZKPaths.PATH_SEPARATOR);
+        if ( mainPathIsRoot && checkPath.equals(ZKPaths.PATH_SEPARATOR) )    // special case. The parent of "/" is "/"
+        {
+            return CacheAction.NOT_STORED;
+        }
+        else if ( ZKPaths.getPathAndNode(checkPath).getPath().equals(mainPath) )
+        {
+            return cacheAction;
+        }
+        return CacheAction.NOT_STORED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java
deleted file mode 100644
index bba145a..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/StatsOnlyCacheFilter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.watch;
-
-public class StatsOnlyCacheFilter implements CacheFilter
-{
-    @Override
-    public CacheAction actionForPath(String path)
-    {
-        return CacheAction.PATH_ONLY;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/TreeRefreshFilter.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/TreeRefreshFilter.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/TreeRefreshFilter.java
deleted file mode 100644
index dda49b4..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/TreeRefreshFilter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes.watch;
-
-public class TreeRefreshFilter implements RefreshFilter
-{
-    @Override
-    public boolean descend(String path)
-    {
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/5b0a9f56/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
index 4ccf3b7..c3cc327 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
@@ -284,8 +284,7 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
             final CountDownLatch updatedLatch = new CountDownLatch(1);
             final CountDownLatch addedLatch = new CountDownLatch(1);
             client.create().creatingParentsIfNeeded().forPath("/test");
-            SingleLevelCacheFilter cacheFilter = new SingleLevelCacheFilter("/test", CacheAction.PATH_ONLY);
-            cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().withCacheFilter(cacheFilter).build();
+            cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().withCacheFilter(CacheFilters.statOnly()).build();
             cache.getListenable().addListener(new CacheListener()
             {
                 @Override
@@ -724,7 +723,7 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
 
     private void internalTestMode(CuratorFramework client, boolean cacheData) throws Exception
     {
-        CacheFilter cacheFilter = new SingleLevelCacheFilter("/test", cacheData ? CacheAction.PATH_AND_DATA : CacheAction.PATH_ONLY);
+        CacheFilter cacheFilter = cacheData ? CacheFilters.statAndData() : CacheFilters.statOnly();
         try (CuratorCache cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().withCacheFilter(cacheFilter).build() )
         {
             final CountDownLatch latch = new CountDownLatch(2);


[2/5] curator git commit: Added a composite cache

Posted by ra...@apache.org.
Added a composite cache


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/313fd7d4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/313fd7d4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/313fd7d4

Branch: refs/heads/persistent-watch
Commit: 313fd7d46ccede6bbc9ac1feb0b5a2099fce7a6d
Parents: 5b0a9f5
Author: randgalt <ra...@apache.org>
Authored: Fri Dec 30 15:12:04 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Dec 30 15:12:04 2016 -0500

----------------------------------------------------------------------
 .../framework/recipes/watch/CacheFilters.java   |  18 ++
 .../recipes/watch/CompositeCountDownLatch.java  |  82 ++++++
 .../recipes/watch/CompositeCuratorCache.java    | 282 +++++++++++++++++++
 .../watch/CompositeCuratorCacheProxy.java       |  53 ++++
 .../framework/recipes/watch/CuratorCache.java   |  11 +-
 .../recipes/watch/CuratorCacheBase.java         |   5 +-
 .../recipes/watch/CuratorCacheBuilder.java      |   8 +
 .../recipes/watch/InternalCuratorCache.java     |   4 +-
 .../framework/recipes/watch/RefreshFilters.java |  18 ++
 9 files changed, 468 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
index 171dea4..6d7a119 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.watch;
 
 public class CacheFilters

http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCountDownLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCountDownLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCountDownLatch.java
new file mode 100644
index 0000000..5c75622
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCountDownLatch.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+class CompositeCountDownLatch extends CountDownLatch
+{
+    private final Collection<CountDownLatch> latches;
+
+    CompositeCountDownLatch(Collection<CountDownLatch> latches)
+    {
+        super(0);
+        this.latches = ImmutableList.copyOf(latches);
+    }
+
+    @Override
+    public void await() throws InterruptedException
+    {
+        for ( CountDownLatch latch : latches )
+        {
+            latch.await();
+        }
+    }
+
+    @Override
+    public boolean await(final long timeout, TimeUnit unit) throws InterruptedException
+    {
+        long timeoutMs = unit.toMillis(timeout);
+        for ( CountDownLatch latch : latches )
+        {
+            if ( timeoutMs < 0 )
+            {
+                return false;
+            }
+            long start = System.currentTimeMillis();
+            if ( !latch.await(timeoutMs, TimeUnit.MILLISECONDS) )
+            {
+                return false;
+            }
+            long elapsed = System.currentTimeMillis() - start;
+            timeoutMs -= elapsed;
+        }
+        return true;
+    }
+
+    @Override
+    public void countDown()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getCount()
+    {
+        long count = 0;
+        for ( CountDownLatch latch : latches )
+        {
+            count += latch.getCount();
+        }
+        return count;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
new file mode 100644
index 0000000..c0163c5
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.listen.ListenerContainer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+public class CompositeCuratorCache implements CuratorCache
+{
+    private final Map<String, CuratorCache> caches;
+    private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>();
+    private final CacheListener listener = new CacheListener()
+    {
+        @Override
+        public void process(final CacheEvent event, final String path, final CachedNode affectedNode)
+        {
+            Function<CacheListener, Void> proc = new Function<CacheListener, Void>()
+            {
+                @Override
+                public Void apply(CacheListener listener)
+                {
+                    listener.process(event, path, affectedNode);
+                    return null;
+                }
+            };
+            listeners.forEach(proc);
+        }
+    };
+
+    public CompositeCuratorCache(CuratorCache... caches)
+    {
+        this(toMap(Arrays.asList(caches)));
+    }
+
+    public CompositeCuratorCache(Collection<CuratorCache> caches)
+    {
+        this(toMap(caches));
+    }
+
+    private static Map<String, CuratorCache> toMap(Collection<CuratorCache> caches)
+    {
+        Map<String, CuratorCache> map = new HashMap<>();
+        for ( CuratorCache cache : caches )
+        {
+            map.put(Integer.toString(map.size()), cache);
+        }
+        return map;
+    }
+
+    public CompositeCuratorCache(Map<String, CuratorCache> caches)
+    {
+        this.caches = ImmutableMap.copyOf(caches);
+    }
+
+    public Iterable<String> cacheKeys()
+    {
+        return caches.keySet();
+    }
+
+    public CuratorCache getCache(String key)
+    {
+        return caches.get(key);
+    }
+
+    @Override
+    public CountDownLatch start()
+    {
+        List<CountDownLatch> latches = new ArrayList<>();
+        for ( CuratorCache cache : caches.values() )
+        {
+            latches.add(cache.start());
+        }
+        return new CompositeCountDownLatch(latches);
+    }
+
+    @Override
+    public void close()
+    {
+        for ( CuratorCache cache : caches.values() )
+        {
+            cache.getListenable().removeListener(listener);
+            cache.close();
+        }
+    }
+
+    @Override
+    public Listenable<CacheListener> getListenable()
+    {
+        return listeners;
+    }
+
+    @Override
+    public CountDownLatch refreshAll()
+    {
+        List<CountDownLatch> latches = new ArrayList<>();
+        for ( CuratorCache cache : caches.values() )
+        {
+            latches.add(cache.refreshAll());
+        }
+        return new CompositeCountDownLatch(latches);
+    }
+
+    @Override
+    public CountDownLatch refresh(String path)
+    {
+        List<CountDownLatch> latches = new ArrayList<>();
+        for ( CuratorCache cache : caches.values() )
+        {
+            latches.add(cache.refresh(path));
+        }
+        return new CompositeCountDownLatch(latches);
+    }
+
+    @Override
+    public boolean clear(String path)
+    {
+        boolean cleared = false;
+        for ( CuratorCache cache : caches.values() )
+        {
+            if ( cache.clear(path) )
+            {
+                cleared = true;
+            }
+        }
+        return cleared;
+    }
+
+    @Override
+    public void clearAll()
+    {
+        for ( CuratorCache cache : caches.values() )
+        {
+            cache.clearAll();
+        }
+    }
+
+    @Override
+    public boolean exists(String path)
+    {
+        for ( CuratorCache cache : caches.values() )
+        {
+            if ( cache.exists(path) )
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public Set<String> paths()
+    {
+        Set<String> paths = new HashSet<>();
+        for ( CuratorCache cache : caches.values() )
+        {
+            cache.clearAll();
+        }
+        return paths;
+    }
+
+    @Override
+    public CachedNode get(String path)
+    {
+        for ( CuratorCache cache : caches.values() )
+        {
+            CachedNode node = cache.get(path);
+            if ( node != null )
+            {
+                return node;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Iterable<CachedNode> getAll()
+    {
+        List<Iterable<CachedNode>> nodes = new ArrayList<>();
+        for ( CuratorCache cache : caches.values() )
+        {
+            nodes.add(cache.getAll());
+        }
+        return Iterables.concat(nodes);
+    }
+
+    @Override
+    public Iterable<Map.Entry<String, CachedNode>> entries()
+    {
+        List<Iterable<Map.Entry<String, CachedNode>>> nodes = new ArrayList<>();
+        for ( CuratorCache cache : caches.values() )
+        {
+            nodes.add(cache.entries());
+        }
+        return Iterables.concat(nodes);
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+        for ( CuratorCache cache : caches.values() )
+        {
+            if ( !cache.isEmpty() )
+            {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public int size()
+    {
+        long size = 0;
+        for ( CuratorCache cache : caches.values() )
+        {
+            size += cache.size();
+        }
+        return (int)Math.min(size, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public void clearDataBytes(String path)
+    {
+        for ( CuratorCache cache : caches.values() )
+        {
+            cache.clearDataBytes(path);
+        }
+    }
+
+    @Override
+    public boolean clearDataBytes(String path, int ifVersion)
+    {
+        boolean cleared = false;
+        for ( CuratorCache cache : caches.values() )
+        {
+            if ( cache.clearDataBytes(path, ifVersion) )
+            {
+                cleared = true;
+            }
+        }
+        return cleared;
+    }
+
+    @Override
+    public long refreshCount()
+    {
+        long count = 0;
+        for ( CuratorCache cache : caches.values() )
+        {
+            count = Math.min(count, cache.refreshCount());
+        }
+        return count;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCacheProxy.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCacheProxy.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCacheProxy.java
new file mode 100644
index 0000000..17ca449
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCacheProxy.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class CompositeCuratorCacheProxy extends CompositeCuratorCache
+{
+    public CompositeCuratorCacheProxy(CuratorCache... caches)
+    {
+        super(caches);
+    }
+
+    public CompositeCuratorCacheProxy(Collection<CuratorCache> caches)
+    {
+        super(caches);
+    }
+
+    public CompositeCuratorCacheProxy(Map<String, CuratorCache> caches)
+    {
+        super(caches);
+    }
+
+    @Override
+    public CountDownLatch start()
+    {
+        return new CountDownLatch(0);
+    }
+
+    @Override
+    public void close()
+    {
+        // NOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
index c8a6ea2..164ce29 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
@@ -20,7 +20,6 @@ package org.apache.curator.framework.recipes.watch;
 
 import org.apache.curator.framework.listen.Listenable;
 import java.io.Closeable;
-import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -104,20 +103,18 @@ public interface CuratorCache extends Closeable
     CachedNode get(String path);
 
     /**
-     * Returns the collection of node values in the cache. The returned set behaves in the same manner
-     * as {@link ConcurrentHashMap#values()}
+     * Returns the collection of node values in the cache.
      *
      * @return node values
      */
-    Collection<CachedNode> getAll();
+    Iterable<CachedNode> getAll();
 
     /**
-     * Returns the collection of node entries in the cache. The returned set behaves in the same manner
-     * as {@link ConcurrentHashMap#entrySet()}
+     * Returns the collection of node entries in the cache.
      *
      * @return node entries
      */
-    Set<Map.Entry<String, CachedNode>> entries();
+    Iterable<Map.Entry<String, CachedNode>> entries();
 
     /**
      * Returns true if the cache is currently empty. Use the result only as a reference. Concurrent

http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
index 4803e69..3075c3b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
-import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -107,13 +106,13 @@ abstract class CuratorCacheBase implements CuratorCache
     }
 
     @Override
-    public final Collection<CachedNode> getAll()
+    public final Iterable<CachedNode> getAll()
     {
         return cache.asMap().values();
     }
 
     @Override
-    public final Set<Map.Entry<String, CachedNode>> entries()
+    public final Iterable<Map.Entry<String, CachedNode>> entries()
     {
         return cache.asMap().entrySet();
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
index 341ac23..df068c0 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
@@ -75,6 +75,14 @@ public class CuratorCacheBuilder
         return this;
     }
 
+    public CuratorCacheBuilder forFull(CacheAction cacheAction)
+    {
+        singleNode = false;
+        refreshFilter = RefreshFilters.tree();
+        cacheFilter = CacheFilters.full(cacheAction);
+        return this;
+    }
+
     public CuratorCacheBuilder usingWeakValues()
     {
         cacheBuilder = cacheBuilder.weakValues();

http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index 303ebb1..950c336 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -124,9 +124,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
     @Override
     public CountDownLatch refresh(String path)
     {
-        Preconditions.checkArgument(path.startsWith(basePath), "Path is not this cache's tree: " + path);
-
-        if ( isStarted() )
+        if ( isStarted() && path.startsWith(basePath) )
         {
             CountDownLatch latch = new CountDownLatch(1);
             Refresher refresher = new Refresher(this, path, latch);

http://git-wip-us.apache.org/repos/asf/curator/blob/313fd7d4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
index c1581c4..9ab1ca7 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.curator.framework.recipes.watch;
 
 public class RefreshFilters


[5/5] curator git commit: More test porting, refinements

Posted by ra...@apache.org.
More test porting, refinements


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/38c76631
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/38c76631
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/38c76631

Branch: refs/heads/persistent-watch
Commit: 38c766310432bd1d6b3f64d2778b3605df434e64
Parents: f8f5caf
Author: randgalt <ra...@apache.org>
Authored: Sat Dec 31 16:41:10 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Dec 31 16:41:10 2016 -0500

----------------------------------------------------------------------
 .../framework/recipes/watch/CacheFilters.java   |  19 +-
 .../recipes/watch/CompositeCuratorCache.java    |  15 +-
 .../framework/recipes/watch/CuratorCache.java   |  16 +-
 .../recipes/watch/CuratorCacheBase.java         |  26 +-
 .../recipes/watch/CuratorCacheBuilder.java      |  19 +-
 .../recipes/watch/InternalCuratorCache.java     |  22 +-
 .../recipes/watch/NotifyingRefresher.java       |  43 ++++
 .../recipes/watch/PersistentWatcher.java        |  20 +-
 .../framework/recipes/watch/RefreshFilters.java |  25 ++
 .../framework/recipes/watch/Refresher.java      |  27 +-
 .../recipes/watch/BaseTestTreeCache.java        | 167 +++++++++++++
 .../framework/recipes/watch/EventAndNode.java   |  39 +++
 .../recipes/watch/TestEventOrdering.java        | 175 +++++++++++++
 .../TestSingleLevelCuratorCacheInCluster.java   | 117 +++++++++
 .../framework/recipes/watch/TestTreeCache.java  | 244 +++++++++++++++++++
 15 files changed, 924 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
index 6d7a119..dec16b1 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CacheFilters.java
@@ -24,6 +24,14 @@ public class CacheFilters
     private static final CacheFilter compressedStatAndData = new StandardCacheFilter(CacheAction.STAT_AND_COMPRESSED_DATA);
     private static final CacheFilter statOnly = new StandardCacheFilter(CacheAction.STAT_ONLY);
     private static final CacheFilter pathOnly = new StandardCacheFilter(CacheAction.PATH_ONLY);
+    private static final CacheFilter full = new CacheFilter()
+    {
+        @Override
+        public CacheAction actionForPath(String mainPath, String checkPath)
+        {
+            return CacheAction.STAT_AND_DATA;
+        }
+    };
 
     public static CacheFilter statAndData()
     {
@@ -45,16 +53,9 @@ public class CacheFilters
         return pathOnly;
     }
 
-    public static CacheFilter full(final CacheAction cacheAction)
+    public static CacheFilter full()
     {
-        return new CacheFilter()
-        {
-            @Override
-            public CacheAction actionForPath(String mainPath, String checkPath)
-            {
-                return cacheAction;
-            }
-        };
+        return full;
     }
 
     private CacheFilters()

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
index c0163c5..81373b1 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CompositeCuratorCache.java
@@ -176,12 +176,23 @@ public class CompositeCuratorCache implements CuratorCache
     }
 
     @Override
-    public Set<String> paths()
+    public Collection<String> paths()
     {
         Set<String> paths = new HashSet<>();
         for ( CuratorCache cache : caches.values() )
         {
-            cache.clearAll();
+            paths.addAll(cache.paths());
+        }
+        return paths;
+    }
+
+    @Override
+    public Collection<String> childNamesAtPath(String path)
+    {
+        Set<String> paths = new HashSet<>();
+        for ( CuratorCache cache : caches.values() )
+        {
+            paths.addAll(cache.childNamesAtPath(path));
         }
         return paths;
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
index 164ce29..db6e17f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
@@ -20,9 +20,8 @@ package org.apache.curator.framework.recipes.watch;
 
 import org.apache.curator.framework.listen.Listenable;
 import java.io.Closeable;
+import java.util.Collection;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
 /**
@@ -87,12 +86,19 @@ public interface CuratorCache extends Closeable
     boolean exists(String path);
 
     /**
-     * Returns the set of paths in the cache. The returned set behaves in the same manner
-     * as {@link ConcurrentHashMap#keySet()}
+     * Returns an immutable view of paths in the cache.
      *
      * @return set of paths
      */
-    Set<String> paths();
+    Collection<String> paths();
+
+    /**
+     * Returns the set of child node names of the given node
+     *
+     * @param path node full path
+     * @return child names
+     */
+    Collection<String> childNamesAtPath(String path);
 
     /**
      * Return the node data stored for the path in the cache or null

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
index 3075c3b..1362679 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
@@ -20,12 +20,16 @@ package org.apache.curator.framework.recipes.watch;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
 import com.google.common.cache.Cache;
+import com.google.common.collect.Collections2;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.utils.ZKPaths;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -38,6 +42,7 @@ abstract class CuratorCacheBase implements CuratorCache
     private final AtomicReference<CountDownLatch> initialRefreshLatch = new AtomicReference<>(new CountDownLatch(1));
     private final boolean sendRefreshEvents;
     private final AtomicInteger refreshCount = new AtomicInteger(0);
+    private Function<String, String> function;
 
     protected boolean isStarted()
     {
@@ -94,9 +99,24 @@ abstract class CuratorCacheBase implements CuratorCache
     }
 
     @Override
-    public final Set<String> paths()
+    public final Collection<String> paths()
     {
-        return cache.asMap().keySet();
+        return Collections.unmodifiableCollection(cache.asMap().keySet());
+    }
+
+    @Override
+    public Collection<String> childNamesAtPath(final String basePath)
+    {
+        function = new Function<String, String>()
+        {
+            @Override
+            public String apply(String path)
+            {
+                ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
+                return pathAndNode.getPath().equals(basePath) ? pathAndNode.getNode() : null;
+            }
+        };
+        return Collections2.filter(Collections2.transform(paths(), function), Predicates.notNull());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
index df068c0..31633f0 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBuilder.java
@@ -34,6 +34,7 @@ public class CuratorCacheBuilder
     private boolean sendRefreshEvents = true;
     private boolean refreshOnStart = true;
     private CacheFilter cacheFilter = CacheFilters.statAndData();
+    private boolean sortChildren = true;
 
     public static CuratorCacheBuilder builder(CuratorFramework client, String path)
     {
@@ -48,7 +49,7 @@ public class CuratorCacheBuilder
             Preconditions.checkState(refreshFilter == null, "Single node caches do not use RefreshFilters");
             return new InternalNodeCache(client, path, cacheFilter, cacheBuilder.<String, CachedNode>build(), sendRefreshEvents, refreshOnStart);
         }
-        return new InternalCuratorCache(client, path, cacheFilter, refreshFilter, cacheBuilder.<String, CachedNode>build(), sendRefreshEvents, refreshOnStart);
+        return new InternalCuratorCache(client, path, cacheFilter, refreshFilter, cacheBuilder.<String, CachedNode>build(), sendRefreshEvents, refreshOnStart, sortChildren);
     }
 
     public CuratorCacheBuilder forSingleNode()
@@ -71,15 +72,7 @@ public class CuratorCacheBuilder
     {
         singleNode = false;
         refreshFilter = RefreshFilters.tree();
-        cacheFilter = CacheFilters.statAndData();
-        return this;
-    }
-
-    public CuratorCacheBuilder forFull(CacheAction cacheAction)
-    {
-        singleNode = false;
-        refreshFilter = RefreshFilters.tree();
-        cacheFilter = CacheFilters.full(cacheAction);
+        cacheFilter = CacheFilters.full();
         return this;
     }
 
@@ -131,6 +124,12 @@ public class CuratorCacheBuilder
         return this;
     }
 
+    public CuratorCacheBuilder sortingChildren(boolean sortChildren)
+    {
+        this.sortChildren = sortChildren;
+        return this;
+    }
+
     private CuratorCacheBuilder(CuratorFramework client, String path)
     {
         this.client = Objects.requireNonNull(client, "client cannot be null");

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index a26f430..750545b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -29,6 +29,8 @@ import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Exchanger;
@@ -41,6 +43,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
     private final String basePath;
     private final CacheFilter cacheFilter;
     private final RefreshFilter refreshFilter;
+    private final boolean sortChildren;
     private static final CachedNode nullNode = new CachedNode();
     private static final RefreshFilter nopRefreshFilter = new RefreshFilter()
     {
@@ -51,21 +54,23 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
         }
     };
 
-    InternalCuratorCache(CuratorFramework client, String path, CacheFilter cacheFilter, final RefreshFilter refreshFilter, Cache<String, CachedNode> cache, boolean sendRefreshEvents, final boolean refreshOnStart)
+    InternalCuratorCache(CuratorFramework client, String path, CacheFilter cacheFilter, final RefreshFilter refreshFilter, Cache<String, CachedNode> cache, boolean sendRefreshEvents, final boolean refreshOnStart, boolean sortChildren)
     {
         super(cache, sendRefreshEvents);
         this.client = Objects.requireNonNull(client, "client cannot be null");
         basePath = Objects.requireNonNull(path, "path cannot be null");
         this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null");
         this.refreshFilter = Objects.requireNonNull(refreshFilter, "primingFilter cannot be null");
+        this.sortChildren = sortChildren;
         watcher = new PersistentWatcher(client, path)
         {
             @Override
             protected void noteWatcherReset()
             {
-                if ( refreshOnStart || (refreshCount() > 0) )
+                long count = refreshCount();
+                if ( (refreshOnStart && (count == 0)) || (count > 0) )
                 {
-                    internalRefresh(basePath, new Refresher(InternalCuratorCache.this, basePath), refreshFilter);
+                    internalRefresh(basePath, new NotifyingRefresher(InternalCuratorCache.this, basePath), refreshFilter);
                 }
             }
         };
@@ -108,7 +113,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
             case NodeCreated:
             case NodeDataChanged:
             {
-                internalRefresh(event.getPath(), new Refresher(InternalCuratorCache.this, basePath), nopRefreshFilter);
+                internalRefresh(event.getPath(), new Refresher(InternalCuratorCache.this), nopRefreshFilter);
                 break;
             }
         }
@@ -126,7 +131,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
         if ( isStarted() && path.startsWith(basePath) )
         {
             CountDownLatch latch = new CountDownLatch(1);
-            Refresher refresher = new Refresher(this, path, latch);
+            Refresher refresher = new NotifyingRefresher(this, path, latch);
             internalRefresh(path, refresher, refreshFilter);
             return latch;
         }
@@ -166,7 +171,12 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
                     }
                     else if ( event.getType() == CuratorEventType.CHILDREN )
                     {
-                        for ( String child : event.getChildren() )
+                        List<String> children = event.getChildren();
+                        if ( sortChildren )
+                        {
+                            Collections.sort(children);
+                        }
+                        for ( String child : children )
                         {
                             internalRefresh(ZKPaths.makePath(path, child), refresher, refreshFilter);
                         }

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NotifyingRefresher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NotifyingRefresher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NotifyingRefresher.java
new file mode 100644
index 0000000..c34c94b
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/NotifyingRefresher.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import java.util.concurrent.CountDownLatch;
+
+class NotifyingRefresher extends Refresher
+{
+    private final String refreshPath;
+
+    NotifyingRefresher(CuratorCacheBase cacheBase, String refreshPath)
+    {
+        this(cacheBase, refreshPath, null);
+    }
+
+    NotifyingRefresher(CuratorCacheBase cacheBase, String refreshPath, CountDownLatch latch)
+    {
+        super(cacheBase, latch);
+        this.refreshPath = refreshPath;
+    }
+
+    protected void completed()
+    {
+        cacheBase.notifyListeners(CacheEvent.CACHE_REFRESHED, refreshPath, cacheBase.get(refreshPath));
+        super.completed();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index 9bee7b1..ff7dc82 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -33,12 +33,14 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import java.io.Closeable;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class PersistentWatcher implements Closeable
 {
     private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
     private final ListenerContainer<Watcher> listeners = new ListenerContainer<>();
+    private final AtomicBoolean isSet = new AtomicBoolean(false);
     private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
     {
         @Override
@@ -48,6 +50,10 @@ public class PersistentWatcher implements Closeable
             {
                 reset();
             }
+            else if ( (newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST) )
+            {
+                isSet.set(false);
+            }
         }
     };
     private final Watcher watcher = new Watcher()
@@ -74,9 +80,19 @@ public class PersistentWatcher implements Closeable
         @Override
         public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
         {
-            if ( (event.getType() == CuratorEventType.ADD_PERSISTENT_WATCH) && (event.getResultCode() == KeeperException.Code.OK.intValue()) )
+            if ( (event.getType() == CuratorEventType.ADD_PERSISTENT_WATCH) )
             {
-                noteWatcherReset();
+                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
+                {
+                    if ( isSet.compareAndSet(false, true) )
+                    {
+                        noteWatcherReset();
+                    }
+                }
+                else
+                {
+                    isSet.set(false);
+                }
             }
         }
     };

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
index 9ab1ca7..a03d000 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/RefreshFilters.java
@@ -18,6 +18,8 @@
  */
 package org.apache.curator.framework.recipes.watch;
 
+import org.apache.zookeeper.server.PathIterator;
+
 public class RefreshFilters
 {
     private static final RefreshFilter singleLevel = new RefreshFilter()
@@ -48,6 +50,29 @@ public class RefreshFilters
         return tree;
     }
 
+    public static RefreshFilter maxDepth(final int maxDepth)
+    {
+        return new RefreshFilter()
+        {
+            @Override
+            public boolean descend(String mainPath, String checkPath)
+            {
+                PathIterator pathIterator = new PathIterator(checkPath);
+                int thisDepth = 1;
+                while ( pathIterator.hasNext() )
+                {
+                    String thisParent = pathIterator.next();
+                    if ( thisParent.equals(mainPath) )
+                    {
+                        break;
+                    }
+                    ++thisDepth;
+                }
+                return (thisDepth <= maxDepth);
+            }
+        };
+    }
+
     private RefreshFilters()
     {
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
index fa2ae43..d9db264 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
@@ -23,21 +23,18 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 class Refresher
 {
-    private final CuratorCacheBase cacheBase;
-    private final String refreshPath;
+    protected final CuratorCacheBase cacheBase;
     private final CountDownLatch latch;
     private final AtomicInteger count = new AtomicInteger(0);
 
-    public Refresher(CuratorCacheBase cacheBase, String refreshPath)
+    Refresher(CuratorCacheBase cacheBase)
     {
-        this(cacheBase, refreshPath, null);
+        this(cacheBase, null);
     }
 
-    Refresher(CuratorCacheBase cacheBase, String refreshPath, CountDownLatch latch)
+    Refresher(CuratorCacheBase cacheBase, CountDownLatch latch)
     {
-
         this.cacheBase = cacheBase;
-        this.refreshPath = refreshPath;
         this.latch = latch;
     }
 
@@ -50,12 +47,16 @@ class Refresher
     {
         if ( count.decrementAndGet() <= 0 )
         {
-            cacheBase.notifyListeners(CacheEvent.CACHE_REFRESHED, refreshPath, cacheBase.get(refreshPath));
-            if ( latch != null )
-            {
-                latch.countDown();
-            }
-            cacheBase.incrementRefreshCount();
+            completed();
+        }
+    }
+
+    protected void completed()
+    {
+        if ( latch != null )
+        {
+            latch.countDown();
         }
+        cacheBase.incrementRefreshCount();
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/BaseTestTreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/BaseTestTreeCache.java
new file mode 100644
index 0000000..ab2e999
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/BaseTestTreeCache.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class BaseTestTreeCache extends BaseClassForTests
+{
+    protected CuratorFramework client;
+    protected CuratorCache cache;
+    private final AtomicBoolean hadBackgroundException = new AtomicBoolean(false);
+    private final BlockingQueue<EventAndNode> events = new LinkedBlockingQueue<>();
+    private final Timing timing = new Timing();
+
+    /**
+     * Automatically records all events into an easily testable event stream.
+     */
+    final CacheListener eventListener = new CacheListener()
+    {
+        @Override
+        public void process(CacheEvent event, String path, CachedNode affectedNode)
+        {
+            if ( path.startsWith("/zookeeper") )
+            {
+                return;
+            }
+            events.add(new EventAndNode(event, path, affectedNode));
+        }
+    };
+
+    /**
+     * Construct a TreeCache that records exceptions and automatically listens.
+     */
+    protected CuratorCache newTreeCacheWithListeners(CuratorFramework client, String path)
+    {
+        CuratorCache result = CuratorCacheBuilder.builder(client, path).forTree().build();
+        result.getListenable().addListener(eventListener);
+        return result;
+    }
+
+    /**
+     * Finish constructing a TreeCache that records exceptions and automatically listens.
+     */
+    protected CuratorCache buildWithListeners(CuratorCacheBuilder builder)
+    {
+        CuratorCache result = builder.build();
+        result.getListenable().addListener(eventListener);
+        return result;
+    }
+
+    @Override
+    @BeforeMethod
+    public void setup() throws Exception
+    {
+        super.setup();
+        initCuratorFramework();
+    }
+
+    void initCuratorFramework()
+    {
+        client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+    }
+
+    @Override
+    @AfterMethod
+    public void teardown() throws Exception
+    {
+        try
+        {
+            try
+            {
+                Assert.assertFalse(hadBackgroundException.get(), "Background exceptions were thrown, see stderr for details");
+                assertNoMoreEvents();
+            }
+            finally
+            {
+                CloseableUtils.closeQuietly(cache);
+                TestCleanState.closeAndTestClean(client);
+            }
+        }
+        finally
+        {
+            super.teardown();
+        }
+    }
+
+    /**
+     * Asserts the event queue is empty.
+     */
+    void assertNoMoreEvents() throws InterruptedException
+    {
+        timing.sleepABit();
+        Assert.assertTrue(events.isEmpty(), String.format("Expected no events, found %d; first event: %s", events.size(), events.peek()));
+    }
+
+    /**
+     * Asserts the given event is next in the queue, and consumes it from the queue.
+     */
+    EventAndNode assertEvent(CacheEvent expectedType) throws InterruptedException
+    {
+        return assertEvent(expectedType, null);
+    }
+
+    /**
+     * Asserts the given event is next in the queue, and consumes it from the queue.
+     */
+    EventAndNode assertEvent(CacheEvent expectedType, String expectedPath) throws InterruptedException
+    {
+        return assertEvent(expectedType, expectedPath, null);
+    }
+
+    /**
+     * Asserts the given event is next in the queue, and consumes it from the queue.
+     */
+    EventAndNode assertEvent(CacheEvent expectedType, String expectedPath, byte[] expectedData) throws InterruptedException
+    {
+        EventAndNode event = events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+        Assert.assertNotNull(event, String.format("Expected type: %s, path: %s", expectedType, expectedPath));
+
+        String message = event.toString();
+        Assert.assertEquals(event.type, expectedType, message);
+        if ( expectedPath == null )
+        {
+            Assert.assertEquals(expectedType, event.type);
+        }
+        else
+        {
+            Assert.assertNotNull(event.node, message);
+            Assert.assertEquals(event.path, expectedPath, message);
+        }
+        if ( expectedData != null )
+        {
+            Assert.assertEquals(event.node.getData(), expectedData, message);
+        }
+        return event;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/EventAndNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/EventAndNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/EventAndNode.java
new file mode 100644
index 0000000..eb86216
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/EventAndNode.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+class EventAndNode
+{
+    final CacheEvent type;
+    final String path;
+    final CachedNode node;
+
+    EventAndNode(CacheEvent type, String path, CachedNode node)
+    {
+        this.type = type;
+        this.path = path;
+        this.node = node;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "EventAndNode{" + "type=" + type + ", path='" + path + '\'' + ", node=" + node + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestEventOrdering.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestEventOrdering.java
new file mode 100644
index 0000000..46ccf0a
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestEventOrdering.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class TestEventOrdering extends BaseClassForTests
+{
+    private final Timing timing = new Timing();
+    private final long start = System.currentTimeMillis();
+
+    private static final int THREAD_QTY = 100;
+    private static final int ITERATIONS = 100;
+    private static final int NODE_QTY = 10;
+
+    public enum EventType
+    {
+        ADDED,
+        DELETED
+    }
+
+    public static class Event
+    {
+        public final EventType eventType;
+        public final String path;
+        public final long time = System.currentTimeMillis();
+
+        public Event(EventType eventType, String path)
+        {
+            this.eventType = eventType;
+            this.path = path;
+        }
+    }
+
+    @Test
+    public void testEventOrdering() throws Exception
+    {
+        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_QTY);
+        BlockingQueue<Event> events = Queues.newLinkedBlockingQueue();
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        CuratorCache cache = null;
+        try
+        {
+            client.start();
+            client.create().forPath("/root");
+            cache = CuratorCacheBuilder.builder(client, "/root").build();
+
+            final Random random = new Random();
+            final Callable<Void> task = new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
+                {
+                    for ( int i = 0; i < ITERATIONS; ++i )
+                    {
+                        String node = "/root/" + random.nextInt(NODE_QTY);
+                        try
+                        {
+                            switch ( random.nextInt(3) )
+                            {
+                            default:
+                            case 0:
+                                client.create().forPath(node);
+                                break;
+
+                            case 1:
+                                client.setData().forPath(node, "new".getBytes());
+                                break;
+
+                            case 2:
+                                client.delete().forPath(node);
+                                break;
+                            }
+                        }
+                        catch ( KeeperException ignore )
+                        {
+                            // ignore
+                        }
+                    }
+                    return null;
+                }
+            };
+
+            final CountDownLatch latch = new CountDownLatch(THREAD_QTY);
+            for ( int i = 0; i < THREAD_QTY; ++i )
+            {
+                Callable<Void> wrapped = new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
+                    {
+                        try
+                        {
+                            return task.call();
+                        }
+                        finally
+                        {
+                            latch.countDown();
+                        }
+                    }
+                };
+                executorService.submit(wrapped);
+            }
+            Assert.assertTrue(timing.awaitLatch(latch));
+
+            timing.sleepABit();
+
+            List<Event> localEvents = Lists.newArrayList();
+            int eventSuggestedQty = 0;
+            while ( events.size() > 0 )
+            {
+                Event event = events.take();
+                localEvents.add(event);
+                eventSuggestedQty += (event.eventType == EventType.ADDED) ? 1 : -1;
+            }
+            int actualQty = cache.size();
+            Assert.assertEquals(actualQty, eventSuggestedQty, String.format("actual %s expected %s:\n %s", actualQty, eventSuggestedQty, asString(localEvents)));
+        }
+        finally
+        {
+            executorService.shutdownNow();
+            //noinspection ThrowFromFinallyBlock
+            executorService.awaitTermination(timing.milliseconds(), TimeUnit.MILLISECONDS);
+            CloseableUtils.closeQuietly(cache);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    private String asString(List<Event> events)
+    {
+        int qty = 0;
+        StringBuilder str = new StringBuilder();
+        for ( Event event : events )
+        {
+            qty += (event.eventType == EventType.ADDED) ? 1 : -1;
+            str.append(event.eventType).append(" ").append(event.path).append(" @ ").append(event.time - start).append(' ').append(qty);
+            str.append("\n");
+        }
+        return str.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCacheInCluster.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCacheInCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCacheInCluster.java
new file mode 100644
index 0000000..3153764
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCacheInCluster.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class TestSingleLevelCuratorCacheInCluster
+{
+    private static final Timing timing = new Timing();
+
+    @Test
+    public void testServerLoss() throws Exception
+    {
+        CuratorFramework client = null;
+        CuratorCache cache = null;
+        TestingCluster cluster = new TestingCluster(3);
+        try
+        {
+            cluster.start();
+
+            client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test");
+
+            cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build();
+
+            final CountDownLatch resetLatch = new CountDownLatch(1);
+            final CountDownLatch reconnectLatch = new CountDownLatch(1);
+            final CountDownLatch deleteLatch = new CountDownLatch(1);
+            final CountDownLatch latch = new CountDownLatch(3);
+            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    if ( newState == ConnectionState.SUSPENDED )
+                    {
+                        resetLatch.countDown();
+                    }
+                    if ( newState.isConnected() )
+                    {
+                        reconnectLatch.countDown();
+                    }
+                }
+            });
+            cache.getListenable().addListener(new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path, CachedNode affectedNode)
+                {
+                    if ( event == CacheEvent.NODE_CREATED )
+                    {
+                        latch.countDown();
+                    }
+                    else if ( event == CacheEvent.NODE_DELETED )
+                    {
+                        deleteLatch.countDown();
+                    }
+                }
+            });
+            cache.start();
+
+            client.create().forPath("/test/one");
+            client.create().forPath("/test/two");
+            client.create().forPath("/test/three");
+
+            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+            InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
+            cluster.killServer(connectionInstance);
+
+            Assert.assertTrue(timing.awaitLatch(resetLatch));
+            Assert.assertEquals(cache.size(), 3);
+
+            Assert.assertTrue(timing.awaitLatch(reconnectLatch));
+            Assert.assertEquals(cache.size(), 3);
+
+            Assert.assertEquals(deleteLatch.getCount(), 1);
+            client.delete().forPath("/test/two");
+            Assert.assertTrue(timing.awaitLatch(deleteLatch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(cluster);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/38c76631/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestTreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestTreeCache.java
new file mode 100644
index 0000000..b4c5765
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestTreeCache.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestTreeCache extends BaseTestTreeCache
+{
+    @Test
+    public void testSelector() throws Exception
+    {
+        client.create().forPath("/root");
+        client.create().forPath("/root/n1-a");
+        client.create().forPath("/root/n1-b");
+        client.create().forPath("/root/n1-b/n2-a");
+        client.create().forPath("/root/n1-b/n2-b");
+        client.create().forPath("/root/n1-b/n2-b/n3-a");
+        client.create().forPath("/root/n1-c");
+        client.create().forPath("/root/n1-d");
+
+        CacheFilter cacheFilter = new CacheFilter()
+        {
+            @Override
+            public CacheAction actionForPath(String mainPath, String checkPath)
+            {
+                if ( checkPath.equals("/root/n1-c") )
+                {
+                    return CacheAction.NOT_STORED;
+                }
+                return CacheAction.STAT_AND_DATA;
+            }
+        };
+        RefreshFilter refreshFilter = new RefreshFilter()
+        {
+            @Override
+            public boolean descend(String mainPath, String checkPath)
+            {
+                return !checkPath.equals("/root/n1-b/n2-b");
+            }
+        };
+        cache = buildWithListeners(CuratorCacheBuilder.builder(client, "/root").withCacheFilter(cacheFilter).withRefreshFilter(refreshFilter));
+        cache.start();
+
+        assertEvent(CacheEvent.NODE_CREATED, "/root");
+        assertEvent(CacheEvent.NODE_CREATED, "/root/n1-a");
+        assertEvent(CacheEvent.NODE_CREATED, "/root/n1-b");
+        assertEvent(CacheEvent.NODE_CREATED, "/root/n1-d");
+        assertEvent(CacheEvent.NODE_CREATED, "/root/n1-b/n2-a");
+        assertEvent(CacheEvent.NODE_CREATED, "/root/n1-b/n2-b");
+        assertEvent(CacheEvent.CACHE_REFRESHED);
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testStartup() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/1", "one".getBytes());
+        client.create().forPath("/test/2", "two".getBytes());
+        client.create().forPath("/test/3", "three".getBytes());
+        client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+        cache = newTreeCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(CacheEvent.NODE_CREATED, "/test");
+        assertEvent(CacheEvent.NODE_CREATED, "/test/1", "one".getBytes());
+        assertEvent(CacheEvent.NODE_CREATED, "/test/2", "two".getBytes());
+        assertEvent(CacheEvent.NODE_CREATED, "/test/3", "three".getBytes());
+        assertEvent(CacheEvent.NODE_CREATED, "/test/2/sub", "two-sub".getBytes());
+        assertEvent(CacheEvent.CACHE_REFRESHED);
+        assertNoMoreEvents();
+
+        Assert.assertEquals(Sets.newHashSet(cache.childNamesAtPath("/test")), Sets.newHashSet("1", "2", "3"));
+        Assert.assertEquals(Sets.newHashSet(cache.childNamesAtPath("/test/1")), Sets.newHashSet());
+        Assert.assertEquals(Sets.newHashSet(cache.childNamesAtPath("/test/2")), Sets.newHashSet("sub"));
+        Assert.assertNull(cache.get("/test/non_exist"));
+    }
+
+    @Test
+    public void testStartEmpty() throws Exception
+    {
+        cache = newTreeCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(CacheEvent.CACHE_REFRESHED);
+
+        client.create().forPath("/test");
+        assertEvent(CacheEvent.NODE_CREATED, "/test");
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testStartEmptyDeeper() throws Exception
+    {
+        cache = newTreeCacheWithListeners(client, "/test/foo/bar");
+        cache.start();
+        assertEvent(CacheEvent.CACHE_REFRESHED);
+
+        client.create().creatingParentsIfNeeded().forPath("/test/foo");
+        assertNoMoreEvents();
+        client.create().forPath("/test/foo/bar");
+        assertEvent(CacheEvent.NODE_CREATED, "/test/foo/bar");
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testDepth0() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/1", "one".getBytes());
+        client.create().forPath("/test/2", "two".getBytes());
+        client.create().forPath("/test/3", "three".getBytes());
+        client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+        cache = buildWithListeners(CuratorCacheBuilder.builder(client, "/test").forTree().withRefreshFilter(RefreshFilters.maxDepth(0)));
+        cache.start();
+        assertEvent(CacheEvent.NODE_CREATED, "/test");
+        assertEvent(CacheEvent.CACHE_REFRESHED);
+        assertNoMoreEvents();
+
+        Assert.assertEquals(cache.childNamesAtPath("/test"), ImmutableSet.of());
+        Assert.assertNull(cache.get("/test/1"));
+        Assert.assertEquals(cache.childNamesAtPath("/test/1").size(), 0);
+        Assert.assertNull(cache.get("/test/non_exist"));
+    }
+
+    @Test
+    public void testDepth1() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/1", "one".getBytes());
+        client.create().forPath("/test/2", "two".getBytes());
+        client.create().forPath("/test/3", "three".getBytes());
+        client.create().forPath("/test/2/sub", "two-sub".getBytes());
+
+        cache = buildWithListeners(CuratorCacheBuilder.builder(client, "/test").forTree().withRefreshFilter(RefreshFilters.maxDepth(1)));
+        cache.start();
+        assertEvent(CacheEvent.NODE_CREATED, "/test");
+        assertEvent(CacheEvent.NODE_CREATED, "/test/1", "one".getBytes());
+        assertEvent(CacheEvent.NODE_CREATED, "/test/2", "two".getBytes());
+        assertEvent(CacheEvent.NODE_CREATED, "/test/3", "three".getBytes());
+        assertEvent(CacheEvent.CACHE_REFRESHED);
+        assertNoMoreEvents();
+
+        Assert.assertEquals(Sets.newHashSet(cache.childNamesAtPath("/test")), Sets.newHashSet("1", "2", "3"));
+        Assert.assertEquals(Sets.newHashSet(cache.childNamesAtPath("/test/1")), Sets.newHashSet());
+        Assert.assertEquals(Sets.newHashSet(cache.childNamesAtPath("/test/2")), Sets.newHashSet());
+        Assert.assertNull(cache.get("/test/2/sub"));
+        Assert.assertEquals(cache.childNamesAtPath("/test/2/sub").size(), 0);
+        Assert.assertEquals(cache.childNamesAtPath("/test/non_exist").size(), 0);
+    }
+
+    @Test
+    public void testDepth1Deeper() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/foo");
+        client.create().forPath("/test/foo/bar");
+        client.create().forPath("/test/foo/bar/1", "one".getBytes());
+        client.create().forPath("/test/foo/bar/2", "two".getBytes());
+        client.create().forPath("/test/foo/bar/3", "three".getBytes());
+        client.create().forPath("/test/foo/bar/2/sub", "two-sub".getBytes());
+
+        cache = buildWithListeners(CuratorCacheBuilder.builder(client, "/test/foo/bar").forTree().withRefreshFilter(RefreshFilters.maxDepth(1)));
+        cache.start();
+        assertEvent(CacheEvent.NODE_CREATED, "/test/foo/bar");
+        assertEvent(CacheEvent.NODE_CREATED, "/test/foo/bar/1", "one".getBytes());
+        assertEvent(CacheEvent.NODE_CREATED, "/test/foo/bar/2", "two".getBytes());
+        assertEvent(CacheEvent.NODE_CREATED, "/test/foo/bar/3", "three".getBytes());
+        assertEvent(CacheEvent.CACHE_REFRESHED);
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testAsyncInitialPopulation() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/one", "hey there".getBytes());
+
+        cache = newTreeCacheWithListeners(client, "/test");
+        cache.start();
+        assertEvent(CacheEvent.NODE_CREATED, "/test");
+        assertEvent(CacheEvent.NODE_CREATED, "/test/one");
+        assertEvent(CacheEvent.CACHE_REFRESHED);
+        assertNoMoreEvents();
+    }
+
+    @Test
+    public void testFromRoot() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/one", "hey there".getBytes());
+
+        cache = newTreeCacheWithListeners(client, "/");
+        cache.start();
+        assertEvent(CacheEvent.NODE_CREATED, "/");
+        assertEvent(CacheEvent.NODE_CREATED, "/test");
+        assertEvent(CacheEvent.NODE_CREATED, "/test/one");
+        assertEvent(CacheEvent.CACHE_REFRESHED);
+        assertNoMoreEvents();
+
+        Assert.assertTrue(cache.childNamesAtPath("/").contains("test"));
+        Assert.assertEquals(cache.childNamesAtPath("/test"), ImmutableSet.of("one"));
+        Assert.assertEquals(cache.childNamesAtPath("/test/one"), ImmutableSet.of());
+        Assert.assertEquals(new String(cache.get("/test/one").getData()), "hey there");
+    }
+
+    @Test
+    public void testFromRootWithDepth() throws Exception
+    {
+        client.create().forPath("/test");
+        client.create().forPath("/test/one", "hey there".getBytes());
+
+        cache = buildWithListeners(CuratorCacheBuilder.builder(client, "/").forTree().withRefreshFilter(RefreshFilters.maxDepth(1)));
+        cache.start();
+        assertEvent(CacheEvent.NODE_CREATED, "/");
+        assertEvent(CacheEvent.NODE_CREATED, "/test");
+        assertEvent(CacheEvent.CACHE_REFRESHED);
+        assertNoMoreEvents();
+
+        Assert.assertTrue(cache.childNamesAtPath("/").contains("test"));
+        Assert.assertEquals(cache.childNamesAtPath("/test"), ImmutableSet.of());
+        Assert.assertNull(cache.get("/test/one"));
+        Assert.assertEquals(cache.childNamesAtPath("/test/one").size(), 0);
+    }
+}


[3/5] curator git commit: renamed rebuildTestExchanger

Posted by ra...@apache.org.
renamed rebuildTestExchanger


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1f0bdf92
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1f0bdf92
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1f0bdf92

Branch: refs/heads/persistent-watch
Commit: 1f0bdf9265e6f5bfb34520761649240209c17d72
Parents: 313fd7d
Author: randgalt <ra...@apache.org>
Authored: Fri Dec 30 16:26:10 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Dec 30 16:26:10 2016 -0500

----------------------------------------------------------------------
 .../curator/framework/recipes/watch/InternalCuratorCache.java | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/1f0bdf92/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index 950c336..a26f430 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -19,7 +19,6 @@
 package org.apache.curator.framework.recipes.watch;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.curator.framework.CuratorFramework;
@@ -135,7 +134,7 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
     }
 
     @VisibleForTesting
-    volatile Exchanger<Object> rebuildTestExchanger;
+    volatile Exchanger<Object> debugRebuildTestExchanger;
 
     private void internalRefresh(final String path, final Refresher refresher, final RefreshFilter refreshFilter)
     {
@@ -178,9 +177,9 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
                     // TODO
                 }
                 refresher.decrement();
-                if ( rebuildTestExchanger != null )
+                if ( debugRebuildTestExchanger != null )
                 {
-                    rebuildTestExchanger.exchange(new Object());
+                    debugRebuildTestExchanger.exchange(new Object());
                 }
             }
         };


[4/5] curator git commit: finished ported tests

Posted by ra...@apache.org.
finished ported tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f8f5cafa
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f8f5cafa
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f8f5cafa

Branch: refs/heads/persistent-watch
Commit: f8f5cafa956da97c5fa177ac64ee003e955887da
Parents: 1f0bdf9
Author: randgalt <ra...@apache.org>
Authored: Fri Dec 30 16:26:23 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Dec 30 16:26:23 2016 -0500

----------------------------------------------------------------------
 .../watch/TestSingleLevelCuratorCache.java      | 211 +------------------
 1 file changed, 4 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/f8f5cafa/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
index c3cc327..0305e55 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
@@ -24,14 +24,10 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.imps.TestCleanState;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.KillSession;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
@@ -41,7 +37,6 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.testng.AssertJUnit.assertNotNull;
@@ -433,7 +428,7 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
                         }
                     }
                 });
-                ((InternalCuratorCache)cache).rebuildTestExchanger = new Exchanger<Object>();
+                ((InternalCuratorCache)cache).debugRebuildTestExchanger = new Exchanger<Object>();
                 ExecutorService service = Executors.newSingleThreadExecutor();
                 final AtomicReference<String> deletedPath = new AtomicReference<String>();
                 Future<Object> future = service.submit
@@ -443,7 +438,7 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
                             @Override
                             public Object call() throws Exception
                             {
-                                ((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+                                ((InternalCuratorCache)cache).debugRebuildTestExchanger.exchange(new Object());
 
                                 // simulate another process adding a node while we're rebuilding
                                 client.create().forPath("/test/test");
@@ -454,7 +449,7 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
                                 client.delete().forPath("/test/bar");
                                 deletedPath.set("/test/bar");
 
-                                ((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+                                ((InternalCuratorCache)cache).debugRebuildTestExchanger.exchange(new Object());
 
                                 CachedNode cachedNode = null;
                                 while ( cachedNode == null )
@@ -465,7 +460,7 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
                                 Assert.assertEquals(cachedNode.getData(), "original".getBytes());
                                 client.setData().forPath("/test/snafu", "grilled".getBytes());
 
-                                ((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+                                ((InternalCuratorCache)cache).debugRebuildTestExchanger.exchange(new Object());
 
                                 return null;
                             }
@@ -677,50 +672,6 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
         }
     }
 
-    //@Test
-    public void testRebuildNode() throws Exception
-    {
-        PathChildrenCache cache = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/test/one", "one".getBytes());
-
-            final CountDownLatch latch = new CountDownLatch(1);
-            final AtomicInteger counter = new AtomicInteger();
-            final Semaphore semaphore = new Semaphore(1);
-            cache = new PathChildrenCache(client, "/test", true)
-            {
-                //@Override
-                void getDataAndStat(String fullPath) throws Exception
-                {
-                    semaphore.acquire();
-                    counter.incrementAndGet();
-                    //super.getDataAndStat(fullPath);
-                    latch.countDown();
-                }
-            };
-            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-
-            Assert.assertTrue(timing.awaitLatch(latch));
-
-            int saveCounter = counter.get();
-            client.setData().forPath("/test/one", "alt".getBytes());
-            cache.rebuildNode("/test/one");
-            Assert.assertEquals(cache.getCurrentData("/test/one").getData(), "alt".getBytes());
-            Assert.assertEquals(saveCounter, counter.get());
-
-            semaphore.release(1000);
-            timing.sleepABit();
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(cache);
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
     private void internalTestMode(CuratorFramework client, boolean cacheData) throws Exception
     {
         CacheFilter cacheFilter = cacheData ? CacheFilters.statAndData() : CacheFilters.statOnly();
@@ -805,158 +756,4 @@ public class TestSingleLevelCuratorCache extends BaseClassForTests
             TestCleanState.closeAndTestClean(client);
         }
     }
-
-    //@Test
-    public void testBasicsOnTwoCachesWithSameExecutor() throws Exception
-    {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-        try
-        {
-            client.create().forPath("/test");
-
-            final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
-            final ExecutorService exec = Executors.newSingleThreadExecutor();
-            try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
-            {
-                cache.getListenable().addListener
-                    (
-                        new PathChildrenCacheListener()
-                        {
-                            @Override
-                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                            {
-                                if ( event.getData().getPath().equals("/test/one") )
-                                {
-                                    events.offer(event.getType());
-                                }
-                            }
-                        }
-                    );
-                cache.start();
-
-                final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
-                try ( PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec) )
-                {
-                    cache2.getListenable().addListener(
-                        new PathChildrenCacheListener()
-                        {
-                            @Override
-                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
-                                throws Exception
-                            {
-                                if ( event.getData().getPath().equals("/test/one") )
-                                {
-                                    events2.offer(event.getType());
-                                }
-                            }
-                        }
-                                                      );
-                    cache2.start();
-
-                    client.create().forPath("/test/one", "hey there".getBytes());
-                    Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
-                    Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
-
-                    client.setData().forPath("/test/one", "sup!".getBytes());
-                    Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
-                    Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
-                    Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
-                    Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
-
-                    client.delete().forPath("/test/one");
-                    Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
-                    Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
-                }
-            }
-        }
-        finally
-        {
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    //@Test
-    public void testDeleteNodeAfterCloseDoesntCallExecutor()
-        throws Exception
-    {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-        try
-        {
-            client.create().forPath("/test");
-
-            final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
-            try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
-            {
-                cache.start();
-                client.create().forPath("/test/one", "hey there".getBytes());
-
-                cache.rebuild();
-                Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
-                Assert.assertTrue(exec.isExecuteCalled());
-
-                exec.setExecuteCalled(false);
-            }
-            Assert.assertFalse(exec.isExecuteCalled());
-
-            client.delete().forPath("/test/one");
-            timing.sleepABit();
-            Assert.assertFalse(exec.isExecuteCalled());
-        }
-        finally
-        {
-            TestCleanState.closeAndTestClean(client);
-        }
-
-    }
-
-    /**
-     * Tests the case where there's an outstanding operation being executed when the cache is
-     * shut down. See CURATOR-121, this was causing misleading warning messages to be logged.
-     */
-    //@Test
-    public void testInterruptedOperationOnShutdown() throws Exception
-    {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new RetryOneTime(1));
-        client.start();
-
-        try
-        {
-            final CountDownLatch latch = new CountDownLatch(1);
-            try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) {
-                @Override
-                protected void handleException(Throwable e)
-                {
-                    latch.countDown();
-                }
-            } )
-            {
-                cache.start();
-
-/*
-                cache.offerOperation(new Operation()
-                {
-
-                    @Override
-                    public void invoke() throws Exception
-                    {
-                        Thread.sleep(5000);
-                    }
-                });
-*/
-
-                Thread.sleep(1000);
-
-            }
-
-            latch.await(5, TimeUnit.SECONDS);
-
-            Assert.assertTrue(latch.getCount() == 1, "Unexpected exception occurred");
-        }
-        finally
-        {
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
 }