You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/12/30 17:03:48 UTC

[1/2] curator git commit: continued work on porting old PathChildrenCache tests

Repository: curator
Updated Branches:
  refs/heads/persistent-watch 01652cef6 -> bf73f0d39


http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
new file mode 100644
index 0000000..4bc423a
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
@@ -0,0 +1,961 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.testng.AssertJUnit.assertNotNull;
+
+public class TestSingleLevelCuratorCache extends BaseClassForTests
+{
+    private static final Timing timing = new Timing();
+
+    @Test
+    public void testWithBadConnect() throws Exception
+    {
+        final int serverPort = server.getPort();
+        server.close();
+
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 1000, 1000, new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            final CuratorCache cache = CuratorCacheBuilder.builder(client, "/").forSingleLevel().build();
+            final CountDownLatch addedLatch = new CountDownLatch(1);
+            CacheListener listener = new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    if ( (event == CacheEvent.NODE_CREATED) && path.equals("/baz"))
+                    {
+                        addedLatch.countDown();
+                    }
+                }
+            };
+            cache.getListenable().addListener(listener);
+            cache.start();
+
+            final CountDownLatch connectedLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+            {
+                
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    if(newState == ConnectionState.CONNECTED)
+                    {
+                        connectedLatch.countDown();
+                    }
+                }
+            });
+
+            server = new TestingServer(serverPort, true);
+            
+            Assert.assertTrue(timing.awaitLatch(connectedLatch));
+
+            client.create().creatingParentContainersIfNeeded().forPath("/baz", new byte[]{1, 2, 3});
+
+            assertNotNull("/baz does not exist", client.checkExists().forPath("/baz"));
+
+            Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+            assertNotNull("cache doesn't see /baz", cache.get("/baz").getData());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testPostInitializedForEmpty() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build();
+            cache.getListenable().addListener(new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    if ( event == CacheEvent.CACHE_REFRESHED )
+                    {
+                        latch.countDown();
+                    }
+                }
+            });
+            cache.start();
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testAsyncInitialPopulation() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/one", "hey there".getBytes());
+
+            final BlockingQueue<CacheEvent> events = new LinkedBlockingQueue<>();
+            cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().sendingRefreshEvents(false).build();
+            cache.getListenable().addListener(new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    events.offer(event);
+                }
+            });
+            cache.start();
+            CountDownLatch latch = cache.refreshAll();
+
+            CacheEvent event = events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+            Assert.assertEquals(event, CacheEvent.NODE_CREATED);
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testChildrenInitialized() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build();
+
+            final CountDownLatch addedLatch = new CountDownLatch(3);
+            final CountDownLatch initLatch = new CountDownLatch(1);
+            cache.getListenable().addListener(new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    if ( event == CacheEvent.NODE_CREATED )
+                    {
+                        addedLatch.countDown();
+                    }
+                    else if ( event == CacheEvent.CACHE_REFRESHED )
+                    {
+                        initLatch.countDown();
+                    }
+                }
+            });
+
+            client.create().forPath("/test/1", "1".getBytes());
+            client.create().forPath("/test/2", "2".getBytes());
+            client.create().forPath("/test/3", "3".getBytes());
+
+            cache.start();
+
+            Assert.assertTrue(timing.awaitLatch(addedLatch));
+            Assert.assertTrue(timing.awaitLatch(initLatch));
+            Assert.assertEquals(cache.size(), 3);
+            Assert.assertEquals(cache.get("/test/1").getData(), "1".getBytes());
+            Assert.assertEquals(cache.get("/test/2").getData(), "2".getBytes());
+            Assert.assertEquals(cache.get("/test/3").getData(), "3".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testChildrenInitializedNormal() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().sendingRefreshEvents(false).build();
+
+            final CountDownLatch addedLatch = new CountDownLatch(3);
+            cache.getListenable().addListener(new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    Assert.assertNotEquals(event, CacheEvent.CACHE_REFRESHED);
+                    if ( event == CacheEvent.NODE_CREATED )
+                    {
+                        addedLatch.countDown();
+                    }
+                }
+            });
+
+            client.create().forPath("/test/1", "1".getBytes());
+            client.create().forPath("/test/2", "2".getBytes());
+            client.create().forPath("/test/3", "3".getBytes());
+
+            cache.start();
+
+            Assert.assertTrue(timing.awaitLatch(addedLatch));
+            Assert.assertEquals(cache.size(), 3);
+            Assert.assertEquals(cache.get("/test/1").getData(), "1".getBytes());
+            Assert.assertEquals(cache.get("/test/2").getData(), "2".getBytes());
+            Assert.assertEquals(cache.get("/test/3").getData(), "3".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testUpdateWhenNotCachingData() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            final CountDownLatch updatedLatch = new CountDownLatch(1);
+            final CountDownLatch addedLatch = new CountDownLatch(1);
+            client.create().creatingParentsIfNeeded().forPath("/test");
+            SingleLevelCacheFilter cacheFilter = new SingleLevelCacheFilter("/test", CacheAction.PATH_ONLY);
+            cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().withCacheFilter(cacheFilter).build();
+            cache.getListenable().addListener(new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    if ( event == CacheEvent.NODE_CHANGED )
+                    {
+                        updatedLatch.countDown();
+                    }
+                    else if ( event == CacheEvent.NODE_CREATED )
+                    {
+                        addedLatch.countDown();
+                    }
+                }
+            });
+            cache.start();
+
+            client.create().forPath("/test/foo", "first".getBytes());
+            Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+            client.setData().forPath("/test/foo", "something new".getBytes());
+            Assert.assertTrue(timing.awaitLatch(updatedLatch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testDeleteThenCreate() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+            client.create().forPath("/test/foo", "one".getBytes());
+
+            final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+            client.getUnhandledErrorListenable().addListener
+                (
+                    new UnhandledErrorListener()
+                    {
+                        @Override
+                        public void unhandledError(String message, Throwable e)
+                        {
+                            error.set(e);
+                        }
+                    }
+                );
+
+            final CountDownLatch initializedLatch = new CountDownLatch(1);
+            final CountDownLatch removedLatch = new CountDownLatch(1);
+            final CountDownLatch postRemovedLatch = new CountDownLatch(1);
+            final CountDownLatch dataLatch = new CountDownLatch(1);
+            try ( CuratorCache cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build() )
+            {
+                cache.getListenable().addListener(new CacheListener()
+                {
+                    @Override
+                    public void process(CacheEvent event, String path)
+                    {
+                        if ( event == CacheEvent.CACHE_REFRESHED )
+                        {
+                            initializedLatch.countDown();
+                        }
+                        else if ( initializedLatch.getCount() == 0 )
+                        {
+                            if ( event == CacheEvent.NODE_DELETED )
+                            {
+                                removedLatch.countDown();
+                                Assert.assertTrue(timing.awaitLatch(postRemovedLatch));
+                            }
+                            else
+                            {
+                                try
+                                {
+                                    Assert.assertEquals(cache.get(path).getData(), "two".getBytes());
+                                }
+                                finally
+                                {
+                                    dataLatch.countDown();
+                                }
+                            }
+                        }
+                    }
+                });
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(initializedLatch));
+
+                client.delete().forPath("/test/foo");
+                Assert.assertTrue(timing.awaitLatch(removedLatch));
+                client.create().forPath("/test/foo", "two".getBytes());
+                postRemovedLatch.countDown();
+                Assert.assertTrue(timing.awaitLatch(dataLatch));
+
+                Throwable t = error.get();
+                if ( t != null )
+                {
+                    Assert.fail("Assert", t);
+                }
+            }
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    //@Test
+    public void testRebuildAgainstOtherProcesses() throws Exception
+    {
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+            client.create().forPath("/test/foo");
+            client.create().forPath("/test/bar");
+            client.create().forPath("/test/snafu", "original".getBytes());
+
+            final CountDownLatch addedLatch = new CountDownLatch(2);
+            try ( final CuratorCache cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build() )
+            {
+                cache.getListenable().addListener(new CacheListener()
+                {
+                    @Override
+                    public void process(CacheEvent event, String path)
+                    {
+                        if ( event == CacheEvent.NODE_CREATED )
+                        {
+                            if ( path.equals("/test/test") )
+                            {
+                                addedLatch.countDown();
+                            }
+                        }
+                        else if ( event == CacheEvent.NODE_CHANGED )
+                        {
+                            if ( path.equals("/test/snafu") )
+                            {
+                                addedLatch.countDown();
+                            }
+                        }
+                    }
+                });
+                ((InternalCuratorCache)cache).rebuildTestExchanger = new Exchanger<Object>();
+                ExecutorService service = Executors.newSingleThreadExecutor();
+                final AtomicReference<String> deletedPath = new AtomicReference<String>();
+                Future<Object> future = service.submit
+                    (
+                        new Callable<Object>()
+                        {
+                            @Override
+                            public Object call() throws Exception
+                            {
+                                ((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+
+                                // simulate another process adding a node while we're rebuilding
+                                client.create().forPath("/test/test");
+
+                                Assert.assertTrue(cache.size() > 0);
+
+                                // simulate another process removing a node while we're rebuilding
+                                client.delete().forPath("/test/bar");
+                                deletedPath.set("/test/bar");
+
+                                ((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+
+                                CachedNode cachedNode = null;
+                                while ( cachedNode == null )
+                                {
+                                    cachedNode = cache.get("/test/snafu");
+                                    timing.sleepABit();
+                                }
+                                Assert.assertEquals(cachedNode.getData(), "original".getBytes());
+                                client.setData().forPath("/test/snafu", "grilled".getBytes());
+
+                                ((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+
+                                return null;
+                            }
+                        }
+                    );
+                cache.start();
+                future.get();
+
+                Assert.assertTrue(timing.awaitLatch(addedLatch));
+                Assert.assertNotNull(cache.get("/test/test"));
+                Assert.assertNull(cache.get(deletedPath.get()));
+                Assert.assertEquals(cache.get("/test/snafu").getData(), "grilled".getBytes());
+            }
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    // see https://github.com/Netflix/curator/issues/27 - was caused by not comparing old->new data
+    @Test
+    public void testIssue27() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/base");
+            client.create().forPath("/base/a");
+            client.create().forPath("/base/b");
+            client.create().forPath("/base/c");
+
+            client.getChildren().forPath("/base");
+
+            final List<CacheEvent> events = Lists.newArrayList();
+            final Semaphore semaphore = new Semaphore(0);
+            cache = CuratorCacheBuilder.builder(client, "/base").forSingleLevel().sendingRefreshEvents(false).build();
+            cache.getListenable().addListener(new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    events.add(event);
+                    semaphore.release();
+                }
+            });
+            cache.start();
+
+            Assert.assertTrue(timing.acquireSemaphore(semaphore, 3));
+
+            client.delete().forPath("/base/a");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+            client.create().forPath("/base/a");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+            List<CacheEvent> expected = Lists.newArrayList
+                (
+                    CacheEvent.NODE_CREATED,
+                    CacheEvent.NODE_CREATED,
+                    CacheEvent.NODE_CREATED,
+                    CacheEvent.NODE_DELETED,
+                    CacheEvent.NODE_CREATED
+                );
+            Assert.assertEquals(expected, events);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    // test Issue 27 using new rebuild() method
+    @Test
+    public void testIssue27Alt() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/base");
+            client.create().forPath("/base/a");
+            client.create().forPath("/base/b");
+            client.create().forPath("/base/c");
+
+            client.getChildren().forPath("/base");
+
+            final List<CacheEvent> events = Lists.newArrayList();
+            final Semaphore semaphore = new Semaphore(0);
+            try ( final CuratorCache cache = CuratorCacheBuilder.builder(client, "/base").forSingleLevel().sendingRefreshEvents(false).build() )
+            {
+                cache.getListenable().addListener(new CacheListener()
+                {
+                    @Override
+                    public void process(CacheEvent event, String path)
+                    {
+                        if ( cache.refreshCount() > 0 )
+                        {
+                            events.add(event);
+                            semaphore.release();
+                        }
+                    }
+                });
+                Assert.assertTrue(timing.awaitLatch(cache.start()));
+
+                client.delete().forPath("/base/a");
+                Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+                client.create().forPath("/base/a");
+                Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+                List<CacheEvent> expected = Lists.newArrayList(CacheEvent.NODE_DELETED, CacheEvent.NODE_CREATED);
+                Assert.assertEquals(expected, events);
+            }
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    //@Test
+    public void testKilledSession() throws Exception
+    {
+        PathChildrenCache cache = null;
+        CuratorFramework client = null;
+        try
+        {
+            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+            client.start();
+            client.create().forPath("/test");
+
+            cache = new PathChildrenCache(client, "/test", true);
+            cache.start();
+
+            final CountDownLatch childAddedLatch = new CountDownLatch(1);
+            final CountDownLatch lostLatch = new CountDownLatch(1);
+            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            final CountDownLatch removedLatch = new CountDownLatch(1);
+            cache.getListenable().addListener
+                (
+                    new PathChildrenCacheListener()
+                    {
+                        @Override
+                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+                        {
+                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+                            {
+                                childAddedLatch.countDown();
+                            }
+                            else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST )
+                            {
+                                lostLatch.countDown();
+                            }
+                            else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED )
+                            {
+                                reconnectedLatch.countDown();
+                            }
+                            else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
+                            {
+                                removedLatch.countDown();
+                            }
+                        }
+                    }
+                );
+
+            client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
+            Assert.assertTrue(timing.awaitLatch(childAddedLatch));
+
+            KillSession.kill(client.getZookeeperClient().getZooKeeper());
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+            Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+            Assert.assertTrue(timing.awaitLatch(removedLatch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    //@Test
+    public void testModes() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            for ( boolean cacheData : new boolean[]{false, true} )
+            {
+                internalTestMode(client, cacheData);
+
+                client.delete().forPath("/test/one");
+                client.delete().forPath("/test/two");
+            }
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    //@Test
+    public void testRebuildNode() throws Exception
+    {
+        PathChildrenCache cache = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/one", "one".getBytes());
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            final AtomicInteger counter = new AtomicInteger();
+            final Semaphore semaphore = new Semaphore(1);
+            cache = new PathChildrenCache(client, "/test", true)
+            {
+                //@Override
+                void getDataAndStat(String fullPath) throws Exception
+                {
+                    semaphore.acquire();
+                    counter.incrementAndGet();
+                    //super.getDataAndStat(fullPath);
+                    latch.countDown();
+                }
+            };
+            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+
+            int saveCounter = counter.get();
+            client.setData().forPath("/test/one", "alt".getBytes());
+            cache.rebuildNode("/test/one");
+            Assert.assertEquals(cache.getCurrentData("/test/one").getData(), "alt".getBytes());
+            Assert.assertEquals(saveCounter, counter.get());
+
+            semaphore.release(1000);
+            timing.sleepABit();
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    private void internalTestMode(CuratorFramework client, boolean cacheData) throws Exception
+    {
+        try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", cacheData) )
+        {
+            final CountDownLatch latch = new CountDownLatch(2);
+            cache.getListenable().addListener
+                (
+                    new PathChildrenCacheListener()
+                    {
+                        @Override
+                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+                        {
+                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+                            {
+                                latch.countDown();
+                            }
+                        }
+                    }
+                );
+            cache.start();
+
+            client.create().forPath("/test/one", "one".getBytes());
+            client.create().forPath("/test/two", "two".getBytes());
+            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+            for ( ChildData data : cache.getCurrentData() )
+            {
+                if ( cacheData )
+                {
+                    Assert.assertNotNull(data.getData());
+                    Assert.assertNotNull(data.getStat());
+                }
+                else
+                {
+                    Assert.assertNull(data.getData());
+                    Assert.assertNotNull(data.getStat());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testBasics() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            final BlockingQueue<CacheEvent> events = new LinkedBlockingQueue<>();
+            try ( CuratorCache cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build() )
+            {
+                cache.getListenable().addListener
+                (
+                    new CacheListener()
+                    {
+                        @Override
+                        public void process(CacheEvent event, String path)
+                        {
+                            if ( path.equals("/test/one") )
+                            {
+                                events.offer(event);
+                            }
+                        }
+                    }
+                );
+                cache.start();
+
+                client.create().forPath("/test/one", "hey there".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_CREATED);
+
+                client.setData().forPath("/test/one", "sup!".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_CHANGED);
+                Assert.assertTrue(cache.exists("/test/one"));
+                Assert.assertEquals(new String(cache.get("/test/one").getData()), "sup!");
+
+                client.delete().forPath("/test/one");
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_DELETED);
+            }
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    //@Test
+    public void testBasicsOnTwoCachesWithSameExecutor() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+            final ExecutorService exec = Executors.newSingleThreadExecutor();
+            try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
+            {
+                cache.getListenable().addListener
+                    (
+                        new PathChildrenCacheListener()
+                        {
+                            @Override
+                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+                            {
+                                if ( event.getData().getPath().equals("/test/one") )
+                                {
+                                    events.offer(event.getType());
+                                }
+                            }
+                        }
+                    );
+                cache.start();
+
+                final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+                try ( PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec) )
+                {
+                    cache2.getListenable().addListener(
+                        new PathChildrenCacheListener()
+                        {
+                            @Override
+                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                                throws Exception
+                            {
+                                if ( event.getData().getPath().equals("/test/one") )
+                                {
+                                    events2.offer(event.getType());
+                                }
+                            }
+                        }
+                                                      );
+                    cache2.start();
+
+                    client.create().forPath("/test/one", "hey there".getBytes());
+                    Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+                    Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+                    client.setData().forPath("/test/one", "sup!".getBytes());
+                    Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                    Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                    Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
+                    Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
+
+                    client.delete().forPath("/test/one");
+                    Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                    Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                }
+            }
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    //@Test
+    public void testDeleteNodeAfterCloseDoesntCallExecutor()
+        throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
+            try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
+            {
+                cache.start();
+                client.create().forPath("/test/one", "hey there".getBytes());
+
+                cache.rebuild();
+                Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
+                Assert.assertTrue(exec.isExecuteCalled());
+
+                exec.setExecuteCalled(false);
+            }
+            Assert.assertFalse(exec.isExecuteCalled());
+
+            client.delete().forPath("/test/one");
+            timing.sleepABit();
+            Assert.assertFalse(exec.isExecuteCalled());
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+
+    }
+
+    /**
+     * Tests the case where there's an outstanding operation being executed when the cache is
+     * shut down. See CURATOR-121, this was causing misleading warning messages to be logged.
+     */
+    //@Test
+    public void testInterruptedOperationOnShutdown() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new RetryOneTime(1));
+        client.start();
+
+        try
+        {
+            final CountDownLatch latch = new CountDownLatch(1);
+            try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) {
+                @Override
+                protected void handleException(Throwable e)
+                {
+                    latch.countDown();
+                }
+            } )
+            {
+                cache.start();
+
+/*
+                cache.offerOperation(new Operation()
+                {
+
+                    @Override
+                    public void invoke() throws Exception
+                    {
+                        Thread.sleep(5000);
+                    }
+                });
+*/
+
+                Thread.sleep(1000);
+
+            }
+
+            latch.await(5, TimeUnit.SECONDS);
+
+            Assert.assertTrue(latch.getCount() == 1, "Unexpected exception occurred");
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+}


[2/2] curator git commit: continued work on porting old PathChildrenCache tests

Posted by ra...@apache.org.
continued work on porting old PathChildrenCache tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bf73f0d3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bf73f0d3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bf73f0d3

Branch: refs/heads/persistent-watch
Commit: bf73f0d3999bfc21b1799ce0c9d3e06214479206
Parents: 01652ce
Author: randgalt <ra...@apache.org>
Authored: Fri Dec 30 12:03:41 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Dec 30 12:03:41 2016 -0500

----------------------------------------------------------------------
 .../framework/recipes/watch/CuratorCache.java   |  27 +-
 .../recipes/watch/CuratorCacheBase.java         |  57 +-
 .../recipes/watch/InternalCuratorCache.java     |  79 +-
 .../recipes/watch/InternalNodeCache.java        |  53 +-
 .../recipes/watch/PersistentWatcher.java        |  23 +-
 .../framework/recipes/watch/Refresher.java      |  18 +-
 .../cache/TestSingleLevelCuratorCache.java      | 996 -------------------
 .../watch/TestSingleLevelCuratorCache.java      | 961 ++++++++++++++++++
 8 files changed, 1111 insertions(+), 1103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
index bc05270..c8a6ea2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
@@ -24,7 +24,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * General interface for client-cached nodes. Create instances
@@ -34,8 +34,10 @@ public interface CuratorCache extends Closeable
 {
     /**
      * Start the cache
+     *
+     * @return a latch that can be used to block until the initial refresh has completed
      */
-    void start();
+    CountDownLatch start();
 
     @Override
     void close();
@@ -48,20 +50,21 @@ public interface CuratorCache extends Closeable
     Listenable<CacheListener> getListenable();
 
     /**
-     * force-fill the cache by getting all applicable nodes. The returned future
+     * force-fill the cache by getting all applicable nodes. The returned latch
      * can be used to check/block for completion.
      *
-     * @return a future that signals when the refresh is complete
+     * @return a latch that signals when the refresh is complete
      */
-    Future<Boolean> refreshAll();
+    CountDownLatch refreshAll();
 
     /**
-     * Refresh the given cached node
+     * Refresh the given cached node The returned latch
+     * can be used to check/block for completion.
      *
      * @param path node full path
-     * @return a future that signals when the refresh is complete
+     * @return a latch that signals when the refresh is complete
      */
-    Future<Boolean> refresh(String path);
+    CountDownLatch refresh(String path);
 
     /**
      * Remove the given path from the cache.
@@ -149,4 +152,12 @@ public interface CuratorCache extends Closeable
      * @return true if the data was cleared
      */
     boolean clearDataBytes(String path, int ifVersion);
+
+    /**
+     * Returns the number of times this cache has been refreshed (manually via one of the refresh()
+     * methods, from starting, from connection problems, etc.).
+     *
+     * @return number of refreshes
+     */
+    long refreshCount();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
index 4f4427a..fcab38c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
@@ -19,6 +19,7 @@
 package org.apache.curator.framework.recipes.watch;
 
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
@@ -26,16 +27,25 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 abstract class CuratorCacheBase implements CuratorCache
 {
     protected final Cache<String, CachedNode> cache;
-    protected final ListenerContainer<CacheListener> listeners = new ListenerContainer<>();
-    protected final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>();
+    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+    private final AtomicReference<CountDownLatch> initialRefreshLatch = new AtomicReference<>(new CountDownLatch(1));
     private final boolean sendRefreshEvents;
+    private final AtomicInteger refreshCount = new AtomicInteger(0);
 
-    protected enum State
+    protected boolean isStarted()
+    {
+        return state.get() == State.STARTED;
+    }
+
+    private enum State
     {
         LATENT,
         STARTED,
@@ -142,6 +152,47 @@ abstract class CuratorCacheBase implements CuratorCache
         return false;
     }
 
+    @Override
+    public long refreshCount()
+    {
+        return refreshCount.get();
+    }
+
+    @Override
+    public final CountDownLatch start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
+
+        internalStart();
+        return initialRefreshLatch.get();
+    }
+
+    @Override
+    public final void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            internalClose();
+            listeners.clear();
+            cache.invalidateAll();
+            cache.cleanUp();
+        }
+    }
+
+    protected abstract void internalClose();
+
+    protected abstract void internalStart();
+
+    void incrementRefreshCount()
+    {
+        refreshCount.incrementAndGet();
+        CountDownLatch latch = initialRefreshLatch.getAndSet(null);
+        if ( latch != null )
+        {
+            latch.countDown();
+        }
+    }
+
     void notifyListeners(final CacheEvent eventType, final String path)
     {
         if ( state.get() != State.STARTED )

http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index 14db725..ca83e9c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -18,22 +18,21 @@
  */
 package org.apache.curator.framework.recipes.watch;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import java.util.Objects;
-import java.util.concurrent.Future;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Exchanger;
 import java.util.concurrent.atomic.AtomicInteger;
 
 class InternalCuratorCache extends CuratorCacheBase implements Watcher
@@ -43,7 +42,6 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
     private final String basePath;
     private final CacheFilter cacheFilter;
     private final RefreshFilter refreshFilter;
-    private final boolean refreshOnStart;
     private static final CachedNode nullNode = new CachedNode();
     private static final RefreshFilter nopRefreshFilter = new RefreshFilter()
     {
@@ -53,52 +51,38 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
             return false;
         }
     };
-    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState newState)
-        {
-            if ( newState.isConnected() )
-            {
-                internalRefresh(basePath, new Refresher(InternalCuratorCache.this, basePath), refreshFilter);
-            }
-        }
-    };
 
-    InternalCuratorCache(CuratorFramework client, String path, CacheFilter cacheFilter, RefreshFilter refreshFilter, Cache<String, CachedNode> cache, boolean sendRefreshEvents, boolean refreshOnStart)
+    InternalCuratorCache(CuratorFramework client, String path, CacheFilter cacheFilter, final RefreshFilter refreshFilter, Cache<String, CachedNode> cache, boolean sendRefreshEvents, final boolean refreshOnStart)
     {
         super(cache, sendRefreshEvents);
         this.client = Objects.requireNonNull(client, "client cannot be null");
         basePath = Objects.requireNonNull(path, "path cannot be null");
         this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null");
         this.refreshFilter = Objects.requireNonNull(refreshFilter, "primingFilter cannot be null");
-        this.refreshOnStart = refreshOnStart;
-        watcher = new PersistentWatcher(client, path);
+        watcher = new PersistentWatcher(client, path)
+        {
+            @Override
+            protected void noteWatcherReset()
+            {
+                if ( refreshOnStart || (refreshCount() > 0) )
+                {
+                    internalRefresh(basePath, new Refresher(InternalCuratorCache.this, basePath), refreshFilter);
+                }
+            }
+        };
         watcher.getListenable().addListener(this);
     }
 
     @Override
-    public void start()
+    protected void internalStart()
     {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
         watcher.start();
-        client.getConnectionStateListenable().addListener(connectionStateListener);
-        if ( refreshOnStart )
-        {
-            internalRefresh(basePath, new Refresher(InternalCuratorCache.this, basePath), refreshFilter);
-        }
     }
 
     @Override
-    public void close()
+    protected void internalClose()
     {
-        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
-        {
-            client.getConnectionStateListenable().removeListener(connectionStateListener);
-            watcher.getListenable().removeListener(this);
-            listeners.clear();
-            watcher.close();
-        }
+        watcher.close();
     }
 
     @Override
@@ -131,29 +115,32 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
     }
 
     @Override
-    public Future<Boolean> refreshAll()
+    public CountDownLatch refreshAll()
     {
         return refresh(basePath);
     }
 
     @Override
-    public Future<Boolean> refresh(String path)
+    public CountDownLatch refresh(String path)
     {
         Preconditions.checkArgument(path.startsWith(basePath), "Path is not this cache's tree: " + path);
 
-        if ( state.get() == State.STARTED )
+        if ( isStarted() )
         {
-            SettableFuture<Boolean> task = SettableFuture.create();
-            Refresher refresher = new Refresher(this, path, task);
+            CountDownLatch latch = new CountDownLatch(1);
+            Refresher refresher = new Refresher(this, path, latch);
             internalRefresh(path, refresher, refreshFilter);
-            return task;
+            return latch;
         }
-        return Futures.immediateFuture(true);
+        return new CountDownLatch(0);
     }
 
+    @VisibleForTesting
+    volatile Exchanger<Object> rebuildTestExchanger;
+
     private void internalRefresh(final String path, final Refresher refresher, final RefreshFilter refreshFilter)
     {
-        if ( state.get() != State.STARTED )
+        if ( !isStarted() )
         {
             return;
         }
@@ -191,6 +178,10 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
                     // TODO
                 }
                 refresher.decrement();
+                if ( rebuildTestExchanger != null )
+                {
+                    rebuildTestExchanger.exchange(new Object());
+                }
             }
         };
 
@@ -208,6 +199,10 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
                 {
                     notifyListeners(CacheEvent.NODE_CREATED, path);
                 }
+                else
+                {
+                    notifyListeners(CacheEvent.NODE_CHANGED, path);
+                }
                 break;
             }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
index c2624b7..3860679 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
@@ -19,7 +19,6 @@
 package org.apache.curator.framework.recipes.watch;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import org.apache.curator.framework.CuratorFramework;
@@ -36,8 +35,8 @@ import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Exchanger;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -104,33 +103,27 @@ class InternalNodeCache extends CuratorCacheBase
     }
 
     @Override
-    public void start()
+    protected void internalStart()
     {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
-
         client.getConnectionStateListenable().addListener(connectionStateListener);
         refreshAll();
     }
 
     @Override
-    public void close()
+    protected void internalClose()
     {
-        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
-        {
-            client.removeWatchers();
-            listeners.clear();
-            client.getConnectionStateListenable().removeListener(connectionStateListener);
-        }
+        client.removeWatchers();
+        client.getConnectionStateListenable().removeListener(connectionStateListener);
     }
 
     @Override
-    public Future<Boolean> refreshAll()
+    public CountDownLatch refreshAll()
     {
         return null;    // TODO
     }
 
     @Override
-    public Future<Boolean> refresh(String path)
+    public CountDownLatch refresh(String path)
     {
         return null;    // TODO
     }
@@ -156,7 +149,7 @@ class InternalNodeCache extends CuratorCacheBase
 
     private void reset(Refresher refresher) throws Exception
     {
-        if ( (state.get() == State.STARTED) && isConnected.get() )
+        if ( isStarted() && isConnected.get() )
         {
             refresher.increment();
             client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback, refresher).forPath(path);
@@ -229,15 +222,15 @@ class InternalNodeCache extends CuratorCacheBase
         CachedNode previousData = data.getAndSet(newData);
         if ( newData == null )
         {
-            notifyListeners(CacheEvent.NODE_DELETED);
+            notifyListeners(CacheEvent.NODE_DELETED, path);
         }
         else if ( previousData == null )
         {
-            notifyListeners(CacheEvent.NODE_CREATED);
+            notifyListeners(CacheEvent.NODE_CREATED, path);
         }
         else if ( !previousData.equals(newData) )
         {
-            notifyListeners(CacheEvent.NODE_CHANGED);
+            notifyListeners(CacheEvent.NODE_CHANGED, path);
         }
 
         if ( rebuildTestExchanger != null )
@@ -252,28 +245,4 @@ class InternalNodeCache extends CuratorCacheBase
             }
         }
     }
-
-    private void notifyListeners(final CacheEvent event)
-    {
-        listeners.forEach
-        (
-            new Function<CacheListener, Void>()
-            {
-                @Override
-                public Void apply(CacheListener listener)
-                {
-                    try
-                    {
-                        listener.process(event, path);
-                    }
-                    catch ( Exception e )
-                    {
-                        ThreadUtils.checkInterrupted(e);
-                        log.error("Calling listener", e);
-                    }
-                    return null;
-                }
-            }
-        );
-    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index bc5ae7e..9bee7b1 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -21,10 +21,14 @@ package org.apache.curator.framework.recipes.watch;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import java.io.Closeable;
@@ -65,6 +69,17 @@ public class PersistentWatcher implements Closeable
     };
     private final CuratorFramework client;
     private final String basePath;
+    private final BackgroundCallback backgroundCallback = new BackgroundCallback()
+    {
+        @Override
+        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+        {
+            if ( (event.getType() == CuratorEventType.ADD_PERSISTENT_WATCH) && (event.getResultCode() == KeeperException.Code.OK.intValue()) )
+            {
+                noteWatcherReset();
+            }
+        }
+    };
 
     private enum State
     {
@@ -100,6 +115,7 @@ public class PersistentWatcher implements Closeable
             {
                 // TODO
             }
+            listeners.clear();
         }
     }
 
@@ -108,11 +124,16 @@ public class PersistentWatcher implements Closeable
         return listeners;
     }
 
+    protected void noteWatcherReset()
+    {
+        // provided for sub-classes to override
+    }
+
     private void reset()
     {
         try
         {
-            client.addPersistentWatch().inBackground().usingWatcher(watcher).forPath(basePath);
+            client.addPersistentWatch().inBackground(backgroundCallback).usingWatcher(watcher).forPath(basePath);
         }
         catch ( Exception e )
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
index 7169c01..e81b475 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
@@ -18,14 +18,14 @@
  */
 package org.apache.curator.framework.recipes.watch;
 
-import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 class Refresher
 {
     private final CuratorCacheBase cacheBase;
     private final String refreshPath;
-    private final SettableFuture<Boolean> task;
+    private final CountDownLatch latch;
     private final AtomicInteger count = new AtomicInteger(0);
 
     public Refresher(CuratorCacheBase cacheBase, String refreshPath)
@@ -33,12 +33,12 @@ class Refresher
         this(cacheBase, refreshPath, null);
     }
 
-    Refresher(CuratorCacheBase cacheBase, String refreshPath, SettableFuture<Boolean> task)
+    Refresher(CuratorCacheBase cacheBase, String refreshPath, CountDownLatch latch)
     {
 
         this.cacheBase = cacheBase;
         this.refreshPath = refreshPath;
-        this.task = task;
+        this.latch = latch;
     }
 
     void increment()
@@ -51,15 +51,11 @@ class Refresher
         if ( count.decrementAndGet() <= 0 )
         {
             cacheBase.notifyListeners(CacheEvent.CACHE_REFRESHED, refreshPath);
-            if ( task != null )
+            if ( latch != null )
             {
-                task.set(true);
+                latch.countDown();
             }
+            cacheBase.incrementRefreshCount();
         }
     }
-
-    boolean isCancelled()
-    {
-        return (task != null) && task.isCancelled();
-    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestSingleLevelCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestSingleLevelCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestSingleLevelCuratorCache.java
deleted file mode 100644
index 6bf0e9e..0000000
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestSingleLevelCuratorCache.java
+++ /dev/null
@@ -1,996 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.framework.recipes.cache;
-
-import com.google.common.collect.Lists;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.UnhandledErrorListener;
-import org.apache.curator.framework.imps.TestCleanState;
-import org.apache.curator.framework.recipes.watch.CacheEvent;
-import org.apache.curator.framework.recipes.watch.CacheListener;
-import org.apache.curator.framework.recipes.watch.CuratorCache;
-import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
-import org.apache.curator.test.KillSession;
-import org.apache.curator.test.TestingServer;
-import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.util.List;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.testng.AssertJUnit.assertNotNull;
-
-public class TestSingleLevelCuratorCache extends BaseClassForTests
-{
-    private static final Timing timing = new Timing();
-
-    @Test
-    public void testWithBadConnect() throws Exception
-    {
-        final int serverPort = server.getPort();
-        server.close();
-
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 1000, 1000, new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            final CuratorCache cache = CuratorCacheBuilder.builder(client, "/").forSingleLevel().build();
-            final CountDownLatch addedLatch = new CountDownLatch(1);
-            CacheListener listener = new CacheListener()
-            {
-                @Override
-                public void process(CacheEvent event, String path)
-                {
-                    if ( (event == CacheEvent.NODE_CREATED) && path.equals("/baz"))
-                    {
-                        addedLatch.countDown();
-                    }
-                }
-            };
-            cache.getListenable().addListener(listener);
-            cache.start();
-
-            final CountDownLatch connectedLatch = new CountDownLatch(1);
-            client.getConnectionStateListenable().addListener(new ConnectionStateListener()
-            {
-                
-                @Override
-                public void stateChanged(CuratorFramework client, ConnectionState newState)
-                {
-                    if(newState == ConnectionState.CONNECTED)
-                    {
-                        connectedLatch.countDown();
-                    }
-                }
-            });
-
-            server = new TestingServer(serverPort, true);
-            
-            Assert.assertTrue(timing.awaitLatch(connectedLatch));
-
-            client.create().creatingParentContainersIfNeeded().forPath("/baz", new byte[]{1, 2, 3});
-
-            assertNotNull("/baz does not exist", client.checkExists().forPath("/baz"));
-
-            Assert.assertTrue(timing.awaitLatch(addedLatch));
-
-            assertNotNull("cache doesn't see /baz", cache.get("/baz").getData());
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(client);
-        }
-    }
-
-    @Test
-    public void testPostInitializedForEmpty() throws Exception
-    {
-        CuratorCache cache = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            final CountDownLatch latch = new CountDownLatch(1);
-            cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build();
-            cache.getListenable().addListener(new CacheListener()
-            {
-                @Override
-                public void process(CacheEvent event, String path)
-                {
-                    if ( event == CacheEvent.CACHE_REFRESHED )
-                    {
-                        latch.countDown();
-                    }
-                }
-            });
-            cache.start();
-            Assert.assertTrue(timing.awaitLatch(latch));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(cache);
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    @Test
-    public void testAsyncInitialPopulation() throws Exception
-    {
-        CuratorCache cache = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-
-            client.create().forPath("/test");
-            client.create().forPath("/test/one", "hey there".getBytes());
-
-            final BlockingQueue<CacheEvent> events = new LinkedBlockingQueue<>();
-            cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().sendingRefreshEvents(false).build();
-            cache.getListenable().addListener(new CacheListener()
-            {
-                @Override
-                public void process(CacheEvent event, String path)
-                {
-                    events.offer(event);
-                }
-            });
-            cache.start();
-            Future<Boolean> task = cache.refreshAll();
-
-            CacheEvent event = events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS);
-            Assert.assertEquals(event, CacheEvent.NODE_CREATED);
-
-            Assert.assertNotNull(task.get(timing.forWaiting().seconds(), TimeUnit.SECONDS));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(cache);
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    @Test
-    public void testChildrenInitialized() throws Exception
-    {
-        CuratorCache cache = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-            client.create().forPath("/test");
-
-            cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build();
-
-            final CountDownLatch addedLatch = new CountDownLatch(3);
-            final CountDownLatch initLatch = new CountDownLatch(1);
-            cache.getListenable().addListener(new CacheListener()
-            {
-                @Override
-                public void process(CacheEvent event, String path)
-                {
-                    if ( event == CacheEvent.NODE_CREATED )
-                    {
-                        addedLatch.countDown();
-                    }
-                    else if ( event == CacheEvent.CACHE_REFRESHED )
-                    {
-                        initLatch.countDown();
-                    }
-                }
-            });
-
-            client.create().forPath("/test/1", "1".getBytes());
-            client.create().forPath("/test/2", "2".getBytes());
-            client.create().forPath("/test/3", "3".getBytes());
-
-            cache.start();
-
-            Assert.assertTrue(timing.awaitLatch(addedLatch));
-            Assert.assertTrue(timing.awaitLatch(initLatch));
-            Assert.assertEquals(cache.size(), 3);
-            Assert.assertEquals(cache.get("/test/1").getData(), "1".getBytes());
-            Assert.assertEquals(cache.get("/test/2").getData(), "2".getBytes());
-            Assert.assertEquals(cache.get("/test/3").getData(), "3".getBytes());
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(cache);
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    @Test
-    public void testChildrenInitializedNormal() throws Exception
-    {
-        CuratorCache cache = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-            client.create().forPath("/test");
-
-            cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().sendingRefreshEvents(false).build();
-
-            final CountDownLatch addedLatch = new CountDownLatch(3);
-            cache.getListenable().addListener(new CacheListener()
-            {
-                @Override
-                public void process(CacheEvent event, String path)
-                {
-                    Assert.assertNotEquals(event, CacheEvent.CACHE_REFRESHED);
-                    if ( event == CacheEvent.NODE_CREATED )
-                    {
-                        addedLatch.countDown();
-                    }
-                }
-            });
-
-            client.create().forPath("/test/1", "1".getBytes());
-            client.create().forPath("/test/2", "2".getBytes());
-            client.create().forPath("/test/3", "3".getBytes());
-
-            cache.start();
-
-            Assert.assertTrue(timing.awaitLatch(addedLatch));
-            Assert.assertEquals(cache.size(), 3);
-            Assert.assertEquals(cache.get("/test/1").getData(), "1".getBytes());
-            Assert.assertEquals(cache.get("/test/2").getData(), "2".getBytes());
-            Assert.assertEquals(cache.get("/test/3").getData(), "3".getBytes());
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(cache);
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    //@Test
-    public void testUpdateWhenNotCachingData() throws Exception
-    {
-        PathChildrenCache cache = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-        try
-        {
-            final CountDownLatch updatedLatch = new CountDownLatch(1);
-            final CountDownLatch addedLatch = new CountDownLatch(1);
-            client.create().creatingParentsIfNeeded().forPath("/test");
-            cache = new PathChildrenCache(client, "/test", false);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
-                    {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                        {
-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
-                            {
-                                updatedLatch.countDown();
-                            }
-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
-                            {
-                                addedLatch.countDown();
-                            }
-                        }
-                    }
-                );
-            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-
-            client.create().forPath("/test/foo", "first".getBytes());
-            Assert.assertTrue(timing.awaitLatch(addedLatch));
-
-            client.setData().forPath("/test/foo", "something new".getBytes());
-            Assert.assertTrue(timing.awaitLatch(updatedLatch));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(cache);
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    //@Test
-    public void testEnsurePath() throws Exception
-    {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-        try
-        {
-            try ( PathChildrenCache cache = new PathChildrenCache(client, "/one/two/three", false) )
-            {
-                cache.start();
-                timing.sleepABit();
-
-                try
-                {
-                    client.create().forPath("/one/two/three/four");
-                }
-                catch ( KeeperException.NoNodeException e )
-                {
-                    Assert.fail("Path should exist", e);
-                }
-            }
-            timing.sleepABit();
-        }
-        finally
-        {
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    //@Test
-    public void testDeleteThenCreate() throws Exception
-    {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-        try
-        {
-            client.create().forPath("/test");
-            client.create().forPath("/test/foo", "one".getBytes());
-
-            final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-            client.getUnhandledErrorListenable().addListener
-                (
-                    new UnhandledErrorListener()
-                    {
-                        @Override
-                        public void unhandledError(String message, Throwable e)
-                        {
-                            error.set(e);
-                        }
-                    }
-                );
-
-            final CountDownLatch removedLatch = new CountDownLatch(1);
-            final CountDownLatch postRemovedLatch = new CountDownLatch(1);
-            final CountDownLatch dataLatch = new CountDownLatch(1);
-            try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true) )
-            {
-                cache.getListenable().addListener
-                    (
-                        new PathChildrenCacheListener()
-                        {
-                            @Override
-                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                            {
-                                if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
-                                {
-                                    removedLatch.countDown();
-                                    Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS));
-                                }
-                                else
-                                {
-                                    try
-                                    {
-                                        Assert.assertEquals(event.getData().getData(), "two".getBytes());
-                                    }
-                                    finally
-                                    {
-                                        dataLatch.countDown();
-                                    }
-                                }
-                            }
-                        }
-                    );
-                cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-
-                client.delete().forPath("/test/foo");
-                Assert.assertTrue(timing.awaitLatch(removedLatch));
-                client.create().forPath("/test/foo", "two".getBytes());
-                postRemovedLatch.countDown();
-                Assert.assertTrue(timing.awaitLatch(dataLatch));
-
-                Throwable t = error.get();
-                if ( t != null )
-                {
-                    Assert.fail("Assert", t);
-                }
-            }
-        }
-        finally
-        {
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    //@Test
-    public void testRebuildAgainstOtherProcesses() throws Exception
-    {
-        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-        try
-        {
-            client.create().forPath("/test");
-            client.create().forPath("/test/foo");
-            client.create().forPath("/test/bar");
-            client.create().forPath("/test/snafu", "original".getBytes());
-
-            final CountDownLatch addedLatch = new CountDownLatch(2);
-            try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", true) )
-            {
-                cache.getListenable().addListener
-                    (
-                        new PathChildrenCacheListener()
-                        {
-                            @Override
-                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                            {
-                                if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
-                                {
-                                    if ( event.getData().getPath().equals("/test/test") )
-                                    {
-                                        addedLatch.countDown();
-                                    }
-                                }
-                                else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
-                                {
-                                    if ( event.getData().getPath().equals("/test/snafu") )
-                                    {
-                                        addedLatch.countDown();
-                                    }
-                                }
-                            }
-                        }
-                    );
-                cache.rebuildTestExchanger = new Exchanger<Object>();
-                ExecutorService service = Executors.newSingleThreadExecutor();
-                final AtomicReference<String> deletedPath = new AtomicReference<String>();
-                Future<Object> future = service.submit
-                    (
-                        new Callable<Object>()
-                        {
-                            @Override
-                            public Object call() throws Exception
-                            {
-                                cache.rebuildTestExchanger.exchange(new Object());
-
-                                // simulate another process adding a node while we're rebuilding
-                                client.create().forPath("/test/test");
-
-                                List<ChildData> currentData = cache.getCurrentData();
-                                Assert.assertTrue(currentData.size() > 0);
-
-                                // simulate another process removing a node while we're rebuilding
-                                client.delete().forPath(currentData.get(0).getPath());
-                                deletedPath.set(currentData.get(0).getPath());
-
-                                cache.rebuildTestExchanger.exchange(new Object());
-
-                                ChildData childData = null;
-                                while ( childData == null )
-                                {
-                                    childData = cache.getCurrentData("/test/snafu");
-                                    Thread.sleep(1000);
-                                }
-                                Assert.assertEquals(childData.getData(), "original".getBytes());
-                                client.setData().forPath("/test/snafu", "grilled".getBytes());
-
-                                cache.rebuildTestExchanger.exchange(new Object());
-
-                                return null;
-                            }
-                        }
-                    );
-                cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-                future.get();
-
-                Assert.assertTrue(timing.awaitLatch(addedLatch));
-                Assert.assertNotNull(cache.getCurrentData("/test/test"));
-                Assert.assertNull(cache.getCurrentData(deletedPath.get()));
-                Assert.assertEquals(cache.getCurrentData("/test/snafu").getData(), "grilled".getBytes());
-            }
-        }
-        finally
-        {
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    // see https://github.com/Netflix/curator/issues/27 - was caused by not comparing old->new data
-    //@Test
-    public void testIssue27() throws Exception
-    {
-        PathChildrenCache cache = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-        try
-        {
-            client.create().forPath("/base");
-            client.create().forPath("/base/a");
-            client.create().forPath("/base/b");
-            client.create().forPath("/base/c");
-
-            client.getChildren().forPath("/base");
-
-            final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
-            final Semaphore semaphore = new Semaphore(0);
-            cache = new PathChildrenCache(client, "/base", true);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
-                    {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                        {
-                            events.add(event.getType());
-                            semaphore.release();
-                        }
-                    }
-                );
-            cache.start();
-
-            Assert.assertTrue(timing.acquireSemaphore(semaphore, 3));
-
-            client.delete().forPath("/base/a");
-            Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
-
-            client.create().forPath("/base/a");
-            Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
-
-            List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList
-                (
-                    PathChildrenCacheEvent.Type.CHILD_ADDED,
-                    PathChildrenCacheEvent.Type.CHILD_ADDED,
-                    PathChildrenCacheEvent.Type.CHILD_ADDED,
-                    PathChildrenCacheEvent.Type.CHILD_REMOVED,
-                    PathChildrenCacheEvent.Type.CHILD_ADDED
-                );
-            Assert.assertEquals(expected, events);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(cache);
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    // test Issue 27 using new rebuild() method
-    //@Test
-    public void testIssue27Alt() throws Exception
-    {
-        PathChildrenCache cache = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-        try
-        {
-            client.create().forPath("/base");
-            client.create().forPath("/base/a");
-            client.create().forPath("/base/b");
-            client.create().forPath("/base/c");
-
-            client.getChildren().forPath("/base");
-
-            final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
-            final Semaphore semaphore = new Semaphore(0);
-            cache = new PathChildrenCache(client, "/base", true);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
-                    {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                        {
-                            events.add(event.getType());
-                            semaphore.release();
-                        }
-                    }
-                );
-            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-
-            client.delete().forPath("/base/a");
-            Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
-
-            client.create().forPath("/base/a");
-            Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
-
-            List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList
-                (
-                    PathChildrenCacheEvent.Type.CHILD_REMOVED,
-                    PathChildrenCacheEvent.Type.CHILD_ADDED
-                );
-            Assert.assertEquals(expected, events);
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(cache);
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    //@Test
-    public void testKilledSession() throws Exception
-    {
-        PathChildrenCache cache = null;
-        CuratorFramework client = null;
-        try
-        {
-            client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-            client.start();
-            client.create().forPath("/test");
-
-            cache = new PathChildrenCache(client, "/test", true);
-            cache.start();
-
-            final CountDownLatch childAddedLatch = new CountDownLatch(1);
-            final CountDownLatch lostLatch = new CountDownLatch(1);
-            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
-            final CountDownLatch removedLatch = new CountDownLatch(1);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
-                    {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                        {
-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
-                            {
-                                childAddedLatch.countDown();
-                            }
-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST )
-                            {
-                                lostLatch.countDown();
-                            }
-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED )
-                            {
-                                reconnectedLatch.countDown();
-                            }
-                            else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
-                            {
-                                removedLatch.countDown();
-                            }
-                        }
-                    }
-                );
-
-            client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
-            Assert.assertTrue(timing.awaitLatch(childAddedLatch));
-
-            KillSession.kill(client.getZookeeperClient().getZooKeeper());
-            Assert.assertTrue(timing.awaitLatch(lostLatch));
-            Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
-            Assert.assertTrue(timing.awaitLatch(removedLatch));
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(cache);
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    //@Test
-    public void testModes() throws Exception
-    {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-        try
-        {
-            client.create().forPath("/test");
-
-            for ( boolean cacheData : new boolean[]{false, true} )
-            {
-                internalTestMode(client, cacheData);
-
-                client.delete().forPath("/test/one");
-                client.delete().forPath("/test/two");
-            }
-        }
-        finally
-        {
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    //@Test
-    public void testRebuildNode() throws Exception
-    {
-        PathChildrenCache cache = null;
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        try
-        {
-            client.start();
-            client.create().creatingParentsIfNeeded().forPath("/test/one", "one".getBytes());
-
-            final CountDownLatch latch = new CountDownLatch(1);
-            final AtomicInteger counter = new AtomicInteger();
-            final Semaphore semaphore = new Semaphore(1);
-            cache = new PathChildrenCache(client, "/test", true)
-            {
-                @Override
-                void getDataAndStat(String fullPath) throws Exception
-                {
-                    semaphore.acquire();
-                    counter.incrementAndGet();
-                    super.getDataAndStat(fullPath);
-                    latch.countDown();
-                }
-            };
-            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-
-            Assert.assertTrue(timing.awaitLatch(latch));
-
-            int saveCounter = counter.get();
-            client.setData().forPath("/test/one", "alt".getBytes());
-            cache.rebuildNode("/test/one");
-            Assert.assertEquals(cache.getCurrentData("/test/one").getData(), "alt".getBytes());
-            Assert.assertEquals(saveCounter, counter.get());
-
-            semaphore.release(1000);
-            timing.sleepABit();
-        }
-        finally
-        {
-            CloseableUtils.closeQuietly(cache);
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    private void internalTestMode(CuratorFramework client, boolean cacheData) throws Exception
-    {
-        try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", cacheData) )
-        {
-            final CountDownLatch latch = new CountDownLatch(2);
-            cache.getListenable().addListener
-                (
-                    new PathChildrenCacheListener()
-                    {
-                        @Override
-                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                        {
-                            if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
-                            {
-                                latch.countDown();
-                            }
-                        }
-                    }
-                );
-            cache.start();
-
-            client.create().forPath("/test/one", "one".getBytes());
-            client.create().forPath("/test/two", "two".getBytes());
-            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
-
-            for ( ChildData data : cache.getCurrentData() )
-            {
-                if ( cacheData )
-                {
-                    Assert.assertNotNull(data.getData());
-                    Assert.assertNotNull(data.getStat());
-                }
-                else
-                {
-                    Assert.assertNull(data.getData());
-                    Assert.assertNotNull(data.getStat());
-                }
-            }
-        }
-    }
-
-    @Test
-    public void testBasics() throws Exception
-    {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-        try
-        {
-            client.create().forPath("/test");
-
-            final BlockingQueue<CacheEvent> events = new LinkedBlockingQueue<>();
-            try ( CuratorCache cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build() )
-            {
-                cache.getListenable().addListener
-                (
-                    new CacheListener()
-                    {
-                        @Override
-                        public void process(CacheEvent event, String path)
-                        {
-                            if ( path.equals("/test/one") )
-                            {
-                                events.offer(event);
-                            }
-                        }
-                    }
-                );
-                cache.start();
-
-                client.create().forPath("/test/one", "hey there".getBytes());
-                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_CREATED);
-
-                client.setData().forPath("/test/one", "sup!".getBytes());
-                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_CHANGED);
-                Assert.assertTrue(cache.exists("/test/one"));
-                Assert.assertEquals(new String(cache.get("/test/one").getData()), "sup!");
-
-                client.delete().forPath("/test/one");
-                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_DELETED);
-            }
-        }
-        finally
-        {
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    //@Test
-    public void testBasicsOnTwoCachesWithSameExecutor() throws Exception
-    {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-        try
-        {
-            client.create().forPath("/test");
-
-            final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
-            final ExecutorService exec = Executors.newSingleThreadExecutor();
-            try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
-            {
-                cache.getListenable().addListener
-                    (
-                        new PathChildrenCacheListener()
-                        {
-                            @Override
-                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-                            {
-                                if ( event.getData().getPath().equals("/test/one") )
-                                {
-                                    events.offer(event.getType());
-                                }
-                            }
-                        }
-                    );
-                cache.start();
-
-                final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
-                try ( PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec) )
-                {
-                    cache2.getListenable().addListener(
-                        new PathChildrenCacheListener()
-                        {
-                            @Override
-                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
-                                throws Exception
-                            {
-                                if ( event.getData().getPath().equals("/test/one") )
-                                {
-                                    events2.offer(event.getType());
-                                }
-                            }
-                        }
-                                                      );
-                    cache2.start();
-
-                    client.create().forPath("/test/one", "hey there".getBytes());
-                    Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
-                    Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
-
-                    client.setData().forPath("/test/one", "sup!".getBytes());
-                    Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
-                    Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
-                    Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
-                    Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
-
-                    client.delete().forPath("/test/one");
-                    Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
-                    Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
-                }
-            }
-        }
-        finally
-        {
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-
-    //@Test
-    public void testDeleteNodeAfterCloseDoesntCallExecutor()
-        throws Exception
-    {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        client.start();
-        try
-        {
-            client.create().forPath("/test");
-
-            final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
-            try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
-            {
-                cache.start();
-                client.create().forPath("/test/one", "hey there".getBytes());
-
-                cache.rebuild();
-                Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
-                Assert.assertTrue(exec.isExecuteCalled());
-
-                exec.setExecuteCalled(false);
-            }
-            Assert.assertFalse(exec.isExecuteCalled());
-
-            client.delete().forPath("/test/one");
-            timing.sleepABit();
-            Assert.assertFalse(exec.isExecuteCalled());
-        }
-        finally
-        {
-            TestCleanState.closeAndTestClean(client);
-        }
-
-    }
-
-    /**
-     * Tests the case where there's an outstanding operation being executed when the cache is
-     * shut down. See CURATOR-121, this was causing misleading warning messages to be logged.
-     */
-    //@Test
-    public void testInterruptedOperationOnShutdown() throws Exception
-    {
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new RetryOneTime(1));
-        client.start();
-
-        try
-        {
-            final CountDownLatch latch = new CountDownLatch(1);
-            try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) {
-                @Override
-                protected void handleException(Throwable e)
-                {
-                    latch.countDown();
-                }
-            } )
-            {
-                cache.start();
-
-                cache.offerOperation(new Operation()
-                {
-
-                    @Override
-                    public void invoke() throws Exception
-                    {
-                        Thread.sleep(5000);
-                    }
-                });
-
-                Thread.sleep(1000);
-
-            }
-
-            latch.await(5, TimeUnit.SECONDS);
-
-            Assert.assertTrue(latch.getCount() == 1, "Unexpected exception occurred");
-        }
-        finally
-        {
-            TestCleanState.closeAndTestClean(client);
-        }
-    }
-}