You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/12/30 17:03:48 UTC
[1/2] curator git commit: continued work on porting old
PathChildrenCache tests
Repository: curator
Updated Branches:
refs/heads/persistent-watch 01652cef6 -> bf73f0d39
http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
new file mode 100644
index 0000000..4bc423a
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
@@ -0,0 +1,961 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.testng.AssertJUnit.assertNotNull;
+
+public class TestSingleLevelCuratorCache extends BaseClassForTests
+{
+ private static final Timing timing = new Timing();
+
+ @Test
+ public void testWithBadConnect() throws Exception
+ {
+ final int serverPort = server.getPort();
+ server.close();
+
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 1000, 1000, new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ final CuratorCache cache = CuratorCacheBuilder.builder(client, "/").forSingleLevel().build();
+ final CountDownLatch addedLatch = new CountDownLatch(1);
+ CacheListener listener = new CacheListener()
+ {
+ @Override
+ public void process(CacheEvent event, String path)
+ {
+ if ( (event == CacheEvent.NODE_CREATED) && path.equals("/baz"))
+ {
+ addedLatch.countDown();
+ }
+ }
+ };
+ cache.getListenable().addListener(listener);
+ cache.start();
+
+ final CountDownLatch connectedLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if(newState == ConnectionState.CONNECTED)
+ {
+ connectedLatch.countDown();
+ }
+ }
+ });
+
+ server = new TestingServer(serverPort, true);
+
+ Assert.assertTrue(timing.awaitLatch(connectedLatch));
+
+ client.create().creatingParentContainersIfNeeded().forPath("/baz", new byte[]{1, 2, 3});
+
+ assertNotNull("/baz does not exist", client.checkExists().forPath("/baz"));
+
+ Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+ assertNotNull("cache doesn't see /baz", cache.get("/baz").getData());
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testPostInitializedForEmpty() throws Exception
+ {
+ CuratorCache cache = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build();
+ cache.getListenable().addListener(new CacheListener()
+ {
+ @Override
+ public void process(CacheEvent event, String path)
+ {
+ if ( event == CacheEvent.CACHE_REFRESHED )
+ {
+ latch.countDown();
+ }
+ }
+ });
+ cache.start();
+ Assert.assertTrue(timing.awaitLatch(latch));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ @Test
+ public void testAsyncInitialPopulation() throws Exception
+ {
+ CuratorCache cache = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ client.create().forPath("/test");
+ client.create().forPath("/test/one", "hey there".getBytes());
+
+ final BlockingQueue<CacheEvent> events = new LinkedBlockingQueue<>();
+ cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().sendingRefreshEvents(false).build();
+ cache.getListenable().addListener(new CacheListener()
+ {
+ @Override
+ public void process(CacheEvent event, String path)
+ {
+ events.offer(event);
+ }
+ });
+ cache.start();
+ CountDownLatch latch = cache.refreshAll();
+
+ CacheEvent event = events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+ Assert.assertEquals(event, CacheEvent.NODE_CREATED);
+
+ Assert.assertTrue(timing.awaitLatch(latch));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ @Test
+ public void testChildrenInitialized() throws Exception
+ {
+ CuratorCache cache = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ client.create().forPath("/test");
+
+ cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build();
+
+ final CountDownLatch addedLatch = new CountDownLatch(3);
+ final CountDownLatch initLatch = new CountDownLatch(1);
+ cache.getListenable().addListener(new CacheListener()
+ {
+ @Override
+ public void process(CacheEvent event, String path)
+ {
+ if ( event == CacheEvent.NODE_CREATED )
+ {
+ addedLatch.countDown();
+ }
+ else if ( event == CacheEvent.CACHE_REFRESHED )
+ {
+ initLatch.countDown();
+ }
+ }
+ });
+
+ client.create().forPath("/test/1", "1".getBytes());
+ client.create().forPath("/test/2", "2".getBytes());
+ client.create().forPath("/test/3", "3".getBytes());
+
+ cache.start();
+
+ Assert.assertTrue(timing.awaitLatch(addedLatch));
+ Assert.assertTrue(timing.awaitLatch(initLatch));
+ Assert.assertEquals(cache.size(), 3);
+ Assert.assertEquals(cache.get("/test/1").getData(), "1".getBytes());
+ Assert.assertEquals(cache.get("/test/2").getData(), "2".getBytes());
+ Assert.assertEquals(cache.get("/test/3").getData(), "3".getBytes());
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ @Test
+ public void testChildrenInitializedNormal() throws Exception
+ {
+ CuratorCache cache = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ client.create().forPath("/test");
+
+ cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().sendingRefreshEvents(false).build();
+
+ final CountDownLatch addedLatch = new CountDownLatch(3);
+ cache.getListenable().addListener(new CacheListener()
+ {
+ @Override
+ public void process(CacheEvent event, String path)
+ {
+ Assert.assertNotEquals(event, CacheEvent.CACHE_REFRESHED);
+ if ( event == CacheEvent.NODE_CREATED )
+ {
+ addedLatch.countDown();
+ }
+ }
+ });
+
+ client.create().forPath("/test/1", "1".getBytes());
+ client.create().forPath("/test/2", "2".getBytes());
+ client.create().forPath("/test/3", "3".getBytes());
+
+ cache.start();
+
+ Assert.assertTrue(timing.awaitLatch(addedLatch));
+ Assert.assertEquals(cache.size(), 3);
+ Assert.assertEquals(cache.get("/test/1").getData(), "1".getBytes());
+ Assert.assertEquals(cache.get("/test/2").getData(), "2".getBytes());
+ Assert.assertEquals(cache.get("/test/3").getData(), "3".getBytes());
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ @Test
+ public void testUpdateWhenNotCachingData() throws Exception
+ {
+ CuratorCache cache = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ final CountDownLatch updatedLatch = new CountDownLatch(1);
+ final CountDownLatch addedLatch = new CountDownLatch(1);
+ client.create().creatingParentsIfNeeded().forPath("/test");
+ SingleLevelCacheFilter cacheFilter = new SingleLevelCacheFilter("/test", CacheAction.PATH_ONLY);
+ cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().withCacheFilter(cacheFilter).build();
+ cache.getListenable().addListener(new CacheListener()
+ {
+ @Override
+ public void process(CacheEvent event, String path)
+ {
+ if ( event == CacheEvent.NODE_CHANGED )
+ {
+ updatedLatch.countDown();
+ }
+ else if ( event == CacheEvent.NODE_CREATED )
+ {
+ addedLatch.countDown();
+ }
+ }
+ });
+ cache.start();
+
+ client.create().forPath("/test/foo", "first".getBytes());
+ Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+ client.setData().forPath("/test/foo", "something new".getBytes());
+ Assert.assertTrue(timing.awaitLatch(updatedLatch));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ @Test
+ public void testDeleteThenCreate() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/foo", "one".getBytes());
+
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+ client.getUnhandledErrorListenable().addListener
+ (
+ new UnhandledErrorListener()
+ {
+ @Override
+ public void unhandledError(String message, Throwable e)
+ {
+ error.set(e);
+ }
+ }
+ );
+
+ final CountDownLatch initializedLatch = new CountDownLatch(1);
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+ final CountDownLatch postRemovedLatch = new CountDownLatch(1);
+ final CountDownLatch dataLatch = new CountDownLatch(1);
+ try ( CuratorCache cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build() )
+ {
+ cache.getListenable().addListener(new CacheListener()
+ {
+ @Override
+ public void process(CacheEvent event, String path)
+ {
+ if ( event == CacheEvent.CACHE_REFRESHED )
+ {
+ initializedLatch.countDown();
+ }
+ else if ( initializedLatch.getCount() == 0 )
+ {
+ if ( event == CacheEvent.NODE_DELETED )
+ {
+ removedLatch.countDown();
+ Assert.assertTrue(timing.awaitLatch(postRemovedLatch));
+ }
+ else
+ {
+ try
+ {
+ Assert.assertEquals(cache.get(path).getData(), "two".getBytes());
+ }
+ finally
+ {
+ dataLatch.countDown();
+ }
+ }
+ }
+ }
+ });
+ cache.start();
+ Assert.assertTrue(timing.awaitLatch(initializedLatch));
+
+ client.delete().forPath("/test/foo");
+ Assert.assertTrue(timing.awaitLatch(removedLatch));
+ client.create().forPath("/test/foo", "two".getBytes());
+ postRemovedLatch.countDown();
+ Assert.assertTrue(timing.awaitLatch(dataLatch));
+
+ Throwable t = error.get();
+ if ( t != null )
+ {
+ Assert.fail("Assert", t);
+ }
+ }
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ //@Test
+ public void testRebuildAgainstOtherProcesses() throws Exception
+ {
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ client.create().forPath("/test");
+ client.create().forPath("/test/foo");
+ client.create().forPath("/test/bar");
+ client.create().forPath("/test/snafu", "original".getBytes());
+
+ final CountDownLatch addedLatch = new CountDownLatch(2);
+ try ( final CuratorCache cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build() )
+ {
+ cache.getListenable().addListener(new CacheListener()
+ {
+ @Override
+ public void process(CacheEvent event, String path)
+ {
+ if ( event == CacheEvent.NODE_CREATED )
+ {
+ if ( path.equals("/test/test") )
+ {
+ addedLatch.countDown();
+ }
+ }
+ else if ( event == CacheEvent.NODE_CHANGED )
+ {
+ if ( path.equals("/test/snafu") )
+ {
+ addedLatch.countDown();
+ }
+ }
+ }
+ });
+ ((InternalCuratorCache)cache).rebuildTestExchanger = new Exchanger<Object>();
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ final AtomicReference<String> deletedPath = new AtomicReference<String>();
+ Future<Object> future = service.submit
+ (
+ new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ ((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+
+ // simulate another process adding a node while we're rebuilding
+ client.create().forPath("/test/test");
+
+ Assert.assertTrue(cache.size() > 0);
+
+ // simulate another process removing a node while we're rebuilding
+ client.delete().forPath("/test/bar");
+ deletedPath.set("/test/bar");
+
+ ((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+
+ CachedNode cachedNode = null;
+ while ( cachedNode == null )
+ {
+ cachedNode = cache.get("/test/snafu");
+ timing.sleepABit();
+ }
+ Assert.assertEquals(cachedNode.getData(), "original".getBytes());
+ client.setData().forPath("/test/snafu", "grilled".getBytes());
+
+ ((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+
+ return null;
+ }
+ }
+ );
+ cache.start();
+ future.get();
+
+ Assert.assertTrue(timing.awaitLatch(addedLatch));
+ Assert.assertNotNull(cache.get("/test/test"));
+ Assert.assertNull(cache.get(deletedPath.get()));
+ Assert.assertEquals(cache.get("/test/snafu").getData(), "grilled".getBytes());
+ }
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ // see https://github.com/Netflix/curator/issues/27 - was caused by not comparing old->new data
+ @Test
+ public void testIssue27() throws Exception
+ {
+ CuratorCache cache = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ client.create().forPath("/base");
+ client.create().forPath("/base/a");
+ client.create().forPath("/base/b");
+ client.create().forPath("/base/c");
+
+ client.getChildren().forPath("/base");
+
+ final List<CacheEvent> events = Lists.newArrayList();
+ final Semaphore semaphore = new Semaphore(0);
+ cache = CuratorCacheBuilder.builder(client, "/base").forSingleLevel().sendingRefreshEvents(false).build();
+ cache.getListenable().addListener(new CacheListener()
+ {
+ @Override
+ public void process(CacheEvent event, String path)
+ {
+ events.add(event);
+ semaphore.release();
+ }
+ });
+ cache.start();
+
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 3));
+
+ client.delete().forPath("/base/a");
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+ client.create().forPath("/base/a");
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+ List<CacheEvent> expected = Lists.newArrayList
+ (
+ CacheEvent.NODE_CREATED,
+ CacheEvent.NODE_CREATED,
+ CacheEvent.NODE_CREATED,
+ CacheEvent.NODE_DELETED,
+ CacheEvent.NODE_CREATED
+ );
+ Assert.assertEquals(expected, events);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ // test Issue 27 using new rebuild() method
+ @Test
+ public void testIssue27Alt() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ client.create().forPath("/base");
+ client.create().forPath("/base/a");
+ client.create().forPath("/base/b");
+ client.create().forPath("/base/c");
+
+ client.getChildren().forPath("/base");
+
+ final List<CacheEvent> events = Lists.newArrayList();
+ final Semaphore semaphore = new Semaphore(0);
+ try ( final CuratorCache cache = CuratorCacheBuilder.builder(client, "/base").forSingleLevel().sendingRefreshEvents(false).build() )
+ {
+ cache.getListenable().addListener(new CacheListener()
+ {
+ @Override
+ public void process(CacheEvent event, String path)
+ {
+ if ( cache.refreshCount() > 0 )
+ {
+ events.add(event);
+ semaphore.release();
+ }
+ }
+ });
+ Assert.assertTrue(timing.awaitLatch(cache.start()));
+
+ client.delete().forPath("/base/a");
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+ client.create().forPath("/base/a");
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+ List<CacheEvent> expected = Lists.newArrayList(CacheEvent.NODE_DELETED, CacheEvent.NODE_CREATED);
+ Assert.assertEquals(expected, events);
+ }
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ //@Test
+ public void testKilledSession() throws Exception
+ {
+ PathChildrenCache cache = null;
+ CuratorFramework client = null;
+ try
+ {
+ client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ client.create().forPath("/test");
+
+ cache = new PathChildrenCache(client, "/test", true);
+ cache.start();
+
+ final CountDownLatch childAddedLatch = new CountDownLatch(1);
+ final CountDownLatch lostLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+ cache.getListenable().addListener
+ (
+ new PathChildrenCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ {
+ if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+ {
+ childAddedLatch.countDown();
+ }
+ else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST )
+ {
+ lostLatch.countDown();
+ }
+ else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED )
+ {
+ reconnectedLatch.countDown();
+ }
+ else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
+ {
+ removedLatch.countDown();
+ }
+ }
+ }
+ );
+
+ client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
+ Assert.assertTrue(timing.awaitLatch(childAddedLatch));
+
+ KillSession.kill(client.getZookeeperClient().getZooKeeper());
+ Assert.assertTrue(timing.awaitLatch(lostLatch));
+ Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+ Assert.assertTrue(timing.awaitLatch(removedLatch));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ //@Test
+ public void testModes() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ client.create().forPath("/test");
+
+ for ( boolean cacheData : new boolean[]{false, true} )
+ {
+ internalTestMode(client, cacheData);
+
+ client.delete().forPath("/test/one");
+ client.delete().forPath("/test/two");
+ }
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ //@Test
+ public void testRebuildNode() throws Exception
+ {
+ PathChildrenCache cache = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ client.create().creatingParentsIfNeeded().forPath("/test/one", "one".getBytes());
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger counter = new AtomicInteger();
+ final Semaphore semaphore = new Semaphore(1);
+ cache = new PathChildrenCache(client, "/test", true)
+ {
+ //@Override
+ void getDataAndStat(String fullPath) throws Exception
+ {
+ semaphore.acquire();
+ counter.incrementAndGet();
+ //super.getDataAndStat(fullPath);
+ latch.countDown();
+ }
+ };
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+
+ Assert.assertTrue(timing.awaitLatch(latch));
+
+ int saveCounter = counter.get();
+ client.setData().forPath("/test/one", "alt".getBytes());
+ cache.rebuildNode("/test/one");
+ Assert.assertEquals(cache.getCurrentData("/test/one").getData(), "alt".getBytes());
+ Assert.assertEquals(saveCounter, counter.get());
+
+ semaphore.release(1000);
+ timing.sleepABit();
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ private void internalTestMode(CuratorFramework client, boolean cacheData) throws Exception
+ {
+ try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", cacheData) )
+ {
+ final CountDownLatch latch = new CountDownLatch(2);
+ cache.getListenable().addListener
+ (
+ new PathChildrenCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ {
+ if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+ {
+ latch.countDown();
+ }
+ }
+ }
+ );
+ cache.start();
+
+ client.create().forPath("/test/one", "one".getBytes());
+ client.create().forPath("/test/two", "two".getBytes());
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ for ( ChildData data : cache.getCurrentData() )
+ {
+ if ( cacheData )
+ {
+ Assert.assertNotNull(data.getData());
+ Assert.assertNotNull(data.getStat());
+ }
+ else
+ {
+ Assert.assertNull(data.getData());
+ Assert.assertNotNull(data.getStat());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testBasics() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ client.create().forPath("/test");
+
+ final BlockingQueue<CacheEvent> events = new LinkedBlockingQueue<>();
+ try ( CuratorCache cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build() )
+ {
+ cache.getListenable().addListener
+ (
+ new CacheListener()
+ {
+ @Override
+ public void process(CacheEvent event, String path)
+ {
+ if ( path.equals("/test/one") )
+ {
+ events.offer(event);
+ }
+ }
+ }
+ );
+ cache.start();
+
+ client.create().forPath("/test/one", "hey there".getBytes());
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_CREATED);
+
+ client.setData().forPath("/test/one", "sup!".getBytes());
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_CHANGED);
+ Assert.assertTrue(cache.exists("/test/one"));
+ Assert.assertEquals(new String(cache.get("/test/one").getData()), "sup!");
+
+ client.delete().forPath("/test/one");
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_DELETED);
+ }
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ //@Test
+ public void testBasicsOnTwoCachesWithSameExecutor() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ client.create().forPath("/test");
+
+ final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+ final ExecutorService exec = Executors.newSingleThreadExecutor();
+ try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
+ {
+ cache.getListenable().addListener
+ (
+ new PathChildrenCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ {
+ if ( event.getData().getPath().equals("/test/one") )
+ {
+ events.offer(event.getType());
+ }
+ }
+ }
+ );
+ cache.start();
+
+ final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+ try ( PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec) )
+ {
+ cache2.getListenable().addListener(
+ new PathChildrenCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+ throws Exception
+ {
+ if ( event.getData().getPath().equals("/test/one") )
+ {
+ events2.offer(event.getType());
+ }
+ }
+ }
+ );
+ cache2.start();
+
+ client.create().forPath("/test/one", "hey there".getBytes());
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+ Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+ client.setData().forPath("/test/one", "sup!".getBytes());
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+ Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+ Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
+ Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
+
+ client.delete().forPath("/test/one");
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+ Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+ }
+ }
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+
+ //@Test
+ public void testDeleteNodeAfterCloseDoesntCallExecutor()
+ throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ client.create().forPath("/test");
+
+ final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
+ try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
+ {
+ cache.start();
+ client.create().forPath("/test/one", "hey there".getBytes());
+
+ cache.rebuild();
+ Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
+ Assert.assertTrue(exec.isExecuteCalled());
+
+ exec.setExecuteCalled(false);
+ }
+ Assert.assertFalse(exec.isExecuteCalled());
+
+ client.delete().forPath("/test/one");
+ timing.sleepABit();
+ Assert.assertFalse(exec.isExecuteCalled());
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+
+ }
+
+ /**
+ * Tests the case where there's an outstanding operation being executed when the cache is
+ * shut down. See CURATOR-121, this was causing misleading warning messages to be logged.
+ */
+ //@Test
+ public void testInterruptedOperationOnShutdown() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new RetryOneTime(1));
+ client.start();
+
+ try
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) {
+ @Override
+ protected void handleException(Throwable e)
+ {
+ latch.countDown();
+ }
+ } )
+ {
+ cache.start();
+
+/*
+ cache.offerOperation(new Operation()
+ {
+
+ @Override
+ public void invoke() throws Exception
+ {
+ Thread.sleep(5000);
+ }
+ });
+*/
+
+ Thread.sleep(1000);
+
+ }
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ Assert.assertTrue(latch.getCount() == 1, "Unexpected exception occurred");
+ }
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ }
+}
[2/2] curator git commit: continued work on porting old
PathChildrenCache tests
Posted by ra...@apache.org.
continued work on porting old PathChildrenCache tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bf73f0d3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bf73f0d3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bf73f0d3
Branch: refs/heads/persistent-watch
Commit: bf73f0d3999bfc21b1799ce0c9d3e06214479206
Parents: 01652ce
Author: randgalt <ra...@apache.org>
Authored: Fri Dec 30 12:03:41 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Dec 30 12:03:41 2016 -0500
----------------------------------------------------------------------
.../framework/recipes/watch/CuratorCache.java | 27 +-
.../recipes/watch/CuratorCacheBase.java | 57 +-
.../recipes/watch/InternalCuratorCache.java | 79 +-
.../recipes/watch/InternalNodeCache.java | 53 +-
.../recipes/watch/PersistentWatcher.java | 23 +-
.../framework/recipes/watch/Refresher.java | 18 +-
.../cache/TestSingleLevelCuratorCache.java | 996 -------------------
.../watch/TestSingleLevelCuratorCache.java | 961 ++++++++++++++++++
8 files changed, 1111 insertions(+), 1103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
index bc05270..c8a6ea2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCache.java
@@ -24,7 +24,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
+import java.util.concurrent.CountDownLatch;
/**
* General interface for client-cached nodes. Create instances
@@ -34,8 +34,10 @@ public interface CuratorCache extends Closeable
{
/**
* Start the cache
+ *
+ * @return a latch that can be used to block until the initial refresh has completed
*/
- void start();
+ CountDownLatch start();
@Override
void close();
@@ -48,20 +50,21 @@ public interface CuratorCache extends Closeable
Listenable<CacheListener> getListenable();
/**
- * force-fill the cache by getting all applicable nodes. The returned future
+ * force-fill the cache by getting all applicable nodes. The returned latch
* can be used to check/block for completion.
*
- * @return a future that signals when the refresh is complete
+ * @return a latch that signals when the refresh is complete
*/
- Future<Boolean> refreshAll();
+ CountDownLatch refreshAll();
/**
- * Refresh the given cached node
+ * Refresh the given cached node The returned latch
+ * can be used to check/block for completion.
*
* @param path node full path
- * @return a future that signals when the refresh is complete
+ * @return a latch that signals when the refresh is complete
*/
- Future<Boolean> refresh(String path);
+ CountDownLatch refresh(String path);
/**
* Remove the given path from the cache.
@@ -149,4 +152,12 @@ public interface CuratorCache extends Closeable
* @return true if the data was cleared
*/
boolean clearDataBytes(String path, int ifVersion);
+
+ /**
+ * Returns the number of times this cache has been refreshed (manually via one of the refresh()
+ * methods, from starting, from connection problems, etc.).
+ *
+ * @return number of refreshes
+ */
+ long refreshCount();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
index 4f4427a..fcab38c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/CuratorCacheBase.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.recipes.watch;
import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
@@ -26,16 +27,25 @@ import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
abstract class CuratorCacheBase implements CuratorCache
{
protected final Cache<String, CachedNode> cache;
- protected final ListenerContainer<CacheListener> listeners = new ListenerContainer<>();
- protected final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+ private final ListenerContainer<CacheListener> listeners = new ListenerContainer<>();
+ private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
+ private final AtomicReference<CountDownLatch> initialRefreshLatch = new AtomicReference<>(new CountDownLatch(1));
private final boolean sendRefreshEvents;
+ private final AtomicInteger refreshCount = new AtomicInteger(0);
- protected enum State
+ protected boolean isStarted()
+ {
+ return state.get() == State.STARTED;
+ }
+
+ private enum State
{
LATENT,
STARTED,
@@ -142,6 +152,47 @@ abstract class CuratorCacheBase implements CuratorCache
return false;
}
+ @Override
+ public long refreshCount()
+ {
+ return refreshCount.get();
+ }
+
+ @Override
+ public final CountDownLatch start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
+
+ internalStart();
+ return initialRefreshLatch.get();
+ }
+
+ @Override
+ public final void close()
+ {
+ if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ internalClose();
+ listeners.clear();
+ cache.invalidateAll();
+ cache.cleanUp();
+ }
+ }
+
+ protected abstract void internalClose();
+
+ protected abstract void internalStart();
+
+ void incrementRefreshCount()
+ {
+ refreshCount.incrementAndGet();
+ CountDownLatch latch = initialRefreshLatch.getAndSet(null);
+ if ( latch != null )
+ {
+ latch.countDown();
+ }
+ }
+
void notifyListeners(final CacheEvent eventType, final String path)
{
if ( state.get() != State.STARTED )
http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
index 14db725..ca83e9c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalCuratorCache.java
@@ -18,22 +18,21 @@
*/
package org.apache.curator.framework.recipes.watch;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.Objects;
-import java.util.concurrent.Future;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Exchanger;
import java.util.concurrent.atomic.AtomicInteger;
class InternalCuratorCache extends CuratorCacheBase implements Watcher
@@ -43,7 +42,6 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
private final String basePath;
private final CacheFilter cacheFilter;
private final RefreshFilter refreshFilter;
- private final boolean refreshOnStart;
private static final CachedNode nullNode = new CachedNode();
private static final RefreshFilter nopRefreshFilter = new RefreshFilter()
{
@@ -53,52 +51,38 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
return false;
}
};
- private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
- {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if ( newState.isConnected() )
- {
- internalRefresh(basePath, new Refresher(InternalCuratorCache.this, basePath), refreshFilter);
- }
- }
- };
- InternalCuratorCache(CuratorFramework client, String path, CacheFilter cacheFilter, RefreshFilter refreshFilter, Cache<String, CachedNode> cache, boolean sendRefreshEvents, boolean refreshOnStart)
+ InternalCuratorCache(CuratorFramework client, String path, CacheFilter cacheFilter, final RefreshFilter refreshFilter, Cache<String, CachedNode> cache, boolean sendRefreshEvents, final boolean refreshOnStart)
{
super(cache, sendRefreshEvents);
this.client = Objects.requireNonNull(client, "client cannot be null");
basePath = Objects.requireNonNull(path, "path cannot be null");
this.cacheFilter = Objects.requireNonNull(cacheFilter, "cacheFilter cannot be null");
this.refreshFilter = Objects.requireNonNull(refreshFilter, "primingFilter cannot be null");
- this.refreshOnStart = refreshOnStart;
- watcher = new PersistentWatcher(client, path);
+ watcher = new PersistentWatcher(client, path)
+ {
+ @Override
+ protected void noteWatcherReset()
+ {
+ if ( refreshOnStart || (refreshCount() > 0) )
+ {
+ internalRefresh(basePath, new Refresher(InternalCuratorCache.this, basePath), refreshFilter);
+ }
+ }
+ };
watcher.getListenable().addListener(this);
}
@Override
- public void start()
+ protected void internalStart()
{
- Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
watcher.start();
- client.getConnectionStateListenable().addListener(connectionStateListener);
- if ( refreshOnStart )
- {
- internalRefresh(basePath, new Refresher(InternalCuratorCache.this, basePath), refreshFilter);
- }
}
@Override
- public void close()
+ protected void internalClose()
{
- if ( state.compareAndSet(State.STARTED, State.CLOSED) )
- {
- client.getConnectionStateListenable().removeListener(connectionStateListener);
- watcher.getListenable().removeListener(this);
- listeners.clear();
- watcher.close();
- }
+ watcher.close();
}
@Override
@@ -131,29 +115,32 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
}
@Override
- public Future<Boolean> refreshAll()
+ public CountDownLatch refreshAll()
{
return refresh(basePath);
}
@Override
- public Future<Boolean> refresh(String path)
+ public CountDownLatch refresh(String path)
{
Preconditions.checkArgument(path.startsWith(basePath), "Path is not this cache's tree: " + path);
- if ( state.get() == State.STARTED )
+ if ( isStarted() )
{
- SettableFuture<Boolean> task = SettableFuture.create();
- Refresher refresher = new Refresher(this, path, task);
+ CountDownLatch latch = new CountDownLatch(1);
+ Refresher refresher = new Refresher(this, path, latch);
internalRefresh(path, refresher, refreshFilter);
- return task;
+ return latch;
}
- return Futures.immediateFuture(true);
+ return new CountDownLatch(0);
}
+ @VisibleForTesting
+ volatile Exchanger<Object> rebuildTestExchanger;
+
private void internalRefresh(final String path, final Refresher refresher, final RefreshFilter refreshFilter)
{
- if ( state.get() != State.STARTED )
+ if ( !isStarted() )
{
return;
}
@@ -191,6 +178,10 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
// TODO
}
refresher.decrement();
+ if ( rebuildTestExchanger != null )
+ {
+ rebuildTestExchanger.exchange(new Object());
+ }
}
};
@@ -208,6 +199,10 @@ class InternalCuratorCache extends CuratorCacheBase implements Watcher
{
notifyListeners(CacheEvent.NODE_CREATED, path);
}
+ else
+ {
+ notifyListeners(CacheEvent.NODE_CHANGED, path);
+ }
break;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
index c2624b7..3860679 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/InternalNodeCache.java
@@ -19,7 +19,6 @@
package org.apache.curator.framework.recipes.watch;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import org.apache.curator.framework.CuratorFramework;
@@ -36,8 +35,8 @@ import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -104,33 +103,27 @@ class InternalNodeCache extends CuratorCacheBase
}
@Override
- public void start()
+ protected void internalStart()
{
- Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
-
client.getConnectionStateListenable().addListener(connectionStateListener);
refreshAll();
}
@Override
- public void close()
+ protected void internalClose()
{
- if ( state.compareAndSet(State.STARTED, State.CLOSED) )
- {
- client.removeWatchers();
- listeners.clear();
- client.getConnectionStateListenable().removeListener(connectionStateListener);
- }
+ client.removeWatchers();
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
}
@Override
- public Future<Boolean> refreshAll()
+ public CountDownLatch refreshAll()
{
return null; // TODO
}
@Override
- public Future<Boolean> refresh(String path)
+ public CountDownLatch refresh(String path)
{
return null; // TODO
}
@@ -156,7 +149,7 @@ class InternalNodeCache extends CuratorCacheBase
private void reset(Refresher refresher) throws Exception
{
- if ( (state.get() == State.STARTED) && isConnected.get() )
+ if ( isStarted() && isConnected.get() )
{
refresher.increment();
client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback, refresher).forPath(path);
@@ -229,15 +222,15 @@ class InternalNodeCache extends CuratorCacheBase
CachedNode previousData = data.getAndSet(newData);
if ( newData == null )
{
- notifyListeners(CacheEvent.NODE_DELETED);
+ notifyListeners(CacheEvent.NODE_DELETED, path);
}
else if ( previousData == null )
{
- notifyListeners(CacheEvent.NODE_CREATED);
+ notifyListeners(CacheEvent.NODE_CREATED, path);
}
else if ( !previousData.equals(newData) )
{
- notifyListeners(CacheEvent.NODE_CHANGED);
+ notifyListeners(CacheEvent.NODE_CHANGED, path);
}
if ( rebuildTestExchanger != null )
@@ -252,28 +245,4 @@ class InternalNodeCache extends CuratorCacheBase
}
}
}
-
- private void notifyListeners(final CacheEvent event)
- {
- listeners.forEach
- (
- new Function<CacheListener, Void>()
- {
- @Override
- public Void apply(CacheListener listener)
- {
- try
- {
- listener.process(event, path);
- }
- catch ( Exception e )
- {
- ThreadUtils.checkInterrupted(e);
- log.error("Calling listener", e);
- }
- return null;
- }
- }
- );
- }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
index bc5ae7e..9bee7b1 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java
@@ -21,10 +21,14 @@ package org.apache.curator.framework.recipes.watch;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.io.Closeable;
@@ -65,6 +69,17 @@ public class PersistentWatcher implements Closeable
};
private final CuratorFramework client;
private final String basePath;
+ private final BackgroundCallback backgroundCallback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ if ( (event.getType() == CuratorEventType.ADD_PERSISTENT_WATCH) && (event.getResultCode() == KeeperException.Code.OK.intValue()) )
+ {
+ noteWatcherReset();
+ }
+ }
+ };
private enum State
{
@@ -100,6 +115,7 @@ public class PersistentWatcher implements Closeable
{
// TODO
}
+ listeners.clear();
}
}
@@ -108,11 +124,16 @@ public class PersistentWatcher implements Closeable
return listeners;
}
+ protected void noteWatcherReset()
+ {
+ // provided for sub-classes to override
+ }
+
private void reset()
{
try
{
- client.addPersistentWatch().inBackground().usingWatcher(watcher).forPath(basePath);
+ client.addPersistentWatch().inBackground(backgroundCallback).usingWatcher(watcher).forPath(basePath);
}
catch ( Exception e )
{
http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
index 7169c01..e81b475 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/Refresher.java
@@ -18,14 +18,14 @@
*/
package org.apache.curator.framework.recipes.watch;
-import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
class Refresher
{
private final CuratorCacheBase cacheBase;
private final String refreshPath;
- private final SettableFuture<Boolean> task;
+ private final CountDownLatch latch;
private final AtomicInteger count = new AtomicInteger(0);
public Refresher(CuratorCacheBase cacheBase, String refreshPath)
@@ -33,12 +33,12 @@ class Refresher
this(cacheBase, refreshPath, null);
}
- Refresher(CuratorCacheBase cacheBase, String refreshPath, SettableFuture<Boolean> task)
+ Refresher(CuratorCacheBase cacheBase, String refreshPath, CountDownLatch latch)
{
this.cacheBase = cacheBase;
this.refreshPath = refreshPath;
- this.task = task;
+ this.latch = latch;
}
void increment()
@@ -51,15 +51,11 @@ class Refresher
if ( count.decrementAndGet() <= 0 )
{
cacheBase.notifyListeners(CacheEvent.CACHE_REFRESHED, refreshPath);
- if ( task != null )
+ if ( latch != null )
{
- task.set(true);
+ latch.countDown();
}
+ cacheBase.incrementRefreshCount();
}
}
-
- boolean isCancelled()
- {
- return (task != null) && task.isCancelled();
- }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestSingleLevelCuratorCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestSingleLevelCuratorCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestSingleLevelCuratorCache.java
deleted file mode 100644
index 6bf0e9e..0000000
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestSingleLevelCuratorCache.java
+++ /dev/null
@@ -1,996 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.framework.recipes.cache;
-
-import com.google.common.collect.Lists;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.UnhandledErrorListener;
-import org.apache.curator.framework.imps.TestCleanState;
-import org.apache.curator.framework.recipes.watch.CacheEvent;
-import org.apache.curator.framework.recipes.watch.CacheListener;
-import org.apache.curator.framework.recipes.watch.CuratorCache;
-import org.apache.curator.framework.recipes.watch.CuratorCacheBuilder;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
-import org.apache.curator.test.KillSession;
-import org.apache.curator.test.TestingServer;
-import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.util.List;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.testng.AssertJUnit.assertNotNull;
-
-public class TestSingleLevelCuratorCache extends BaseClassForTests
-{
- private static final Timing timing = new Timing();
-
- @Test
- public void testWithBadConnect() throws Exception
- {
- final int serverPort = server.getPort();
- server.close();
-
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 1000, 1000, new RetryOneTime(1));
- try
- {
- client.start();
-
- final CuratorCache cache = CuratorCacheBuilder.builder(client, "/").forSingleLevel().build();
- final CountDownLatch addedLatch = new CountDownLatch(1);
- CacheListener listener = new CacheListener()
- {
- @Override
- public void process(CacheEvent event, String path)
- {
- if ( (event == CacheEvent.NODE_CREATED) && path.equals("/baz"))
- {
- addedLatch.countDown();
- }
- }
- };
- cache.getListenable().addListener(listener);
- cache.start();
-
- final CountDownLatch connectedLatch = new CountDownLatch(1);
- client.getConnectionStateListenable().addListener(new ConnectionStateListener()
- {
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if(newState == ConnectionState.CONNECTED)
- {
- connectedLatch.countDown();
- }
- }
- });
-
- server = new TestingServer(serverPort, true);
-
- Assert.assertTrue(timing.awaitLatch(connectedLatch));
-
- client.create().creatingParentContainersIfNeeded().forPath("/baz", new byte[]{1, 2, 3});
-
- assertNotNull("/baz does not exist", client.checkExists().forPath("/baz"));
-
- Assert.assertTrue(timing.awaitLatch(addedLatch));
-
- assertNotNull("cache doesn't see /baz", cache.get("/baz").getData());
- }
- finally
- {
- CloseableUtils.closeQuietly(client);
- }
- }
-
- @Test
- public void testPostInitializedForEmpty() throws Exception
- {
- CuratorCache cache = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
- client.start();
-
- final CountDownLatch latch = new CountDownLatch(1);
- cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build();
- cache.getListenable().addListener(new CacheListener()
- {
- @Override
- public void process(CacheEvent event, String path)
- {
- if ( event == CacheEvent.CACHE_REFRESHED )
- {
- latch.countDown();
- }
- }
- });
- cache.start();
- Assert.assertTrue(timing.awaitLatch(latch));
- }
- finally
- {
- CloseableUtils.closeQuietly(cache);
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- @Test
- public void testAsyncInitialPopulation() throws Exception
- {
- CuratorCache cache = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
- client.start();
-
- client.create().forPath("/test");
- client.create().forPath("/test/one", "hey there".getBytes());
-
- final BlockingQueue<CacheEvent> events = new LinkedBlockingQueue<>();
- cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().sendingRefreshEvents(false).build();
- cache.getListenable().addListener(new CacheListener()
- {
- @Override
- public void process(CacheEvent event, String path)
- {
- events.offer(event);
- }
- });
- cache.start();
- Future<Boolean> task = cache.refreshAll();
-
- CacheEvent event = events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS);
- Assert.assertEquals(event, CacheEvent.NODE_CREATED);
-
- Assert.assertNotNull(task.get(timing.forWaiting().seconds(), TimeUnit.SECONDS));
- }
- finally
- {
- CloseableUtils.closeQuietly(cache);
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- @Test
- public void testChildrenInitialized() throws Exception
- {
- CuratorCache cache = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
- client.start();
- client.create().forPath("/test");
-
- cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build();
-
- final CountDownLatch addedLatch = new CountDownLatch(3);
- final CountDownLatch initLatch = new CountDownLatch(1);
- cache.getListenable().addListener(new CacheListener()
- {
- @Override
- public void process(CacheEvent event, String path)
- {
- if ( event == CacheEvent.NODE_CREATED )
- {
- addedLatch.countDown();
- }
- else if ( event == CacheEvent.CACHE_REFRESHED )
- {
- initLatch.countDown();
- }
- }
- });
-
- client.create().forPath("/test/1", "1".getBytes());
- client.create().forPath("/test/2", "2".getBytes());
- client.create().forPath("/test/3", "3".getBytes());
-
- cache.start();
-
- Assert.assertTrue(timing.awaitLatch(addedLatch));
- Assert.assertTrue(timing.awaitLatch(initLatch));
- Assert.assertEquals(cache.size(), 3);
- Assert.assertEquals(cache.get("/test/1").getData(), "1".getBytes());
- Assert.assertEquals(cache.get("/test/2").getData(), "2".getBytes());
- Assert.assertEquals(cache.get("/test/3").getData(), "3".getBytes());
- }
- finally
- {
- CloseableUtils.closeQuietly(cache);
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- @Test
- public void testChildrenInitializedNormal() throws Exception
- {
- CuratorCache cache = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
- client.start();
- client.create().forPath("/test");
-
- cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().sendingRefreshEvents(false).build();
-
- final CountDownLatch addedLatch = new CountDownLatch(3);
- cache.getListenable().addListener(new CacheListener()
- {
- @Override
- public void process(CacheEvent event, String path)
- {
- Assert.assertNotEquals(event, CacheEvent.CACHE_REFRESHED);
- if ( event == CacheEvent.NODE_CREATED )
- {
- addedLatch.countDown();
- }
- }
- });
-
- client.create().forPath("/test/1", "1".getBytes());
- client.create().forPath("/test/2", "2".getBytes());
- client.create().forPath("/test/3", "3".getBytes());
-
- cache.start();
-
- Assert.assertTrue(timing.awaitLatch(addedLatch));
- Assert.assertEquals(cache.size(), 3);
- Assert.assertEquals(cache.get("/test/1").getData(), "1".getBytes());
- Assert.assertEquals(cache.get("/test/2").getData(), "2".getBytes());
- Assert.assertEquals(cache.get("/test/3").getData(), "3".getBytes());
- }
- finally
- {
- CloseableUtils.closeQuietly(cache);
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- //@Test
- public void testUpdateWhenNotCachingData() throws Exception
- {
- PathChildrenCache cache = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
- try
- {
- final CountDownLatch updatedLatch = new CountDownLatch(1);
- final CountDownLatch addedLatch = new CountDownLatch(1);
- client.create().creatingParentsIfNeeded().forPath("/test");
- cache = new PathChildrenCache(client, "/test", false);
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
- {
- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
- {
- updatedLatch.countDown();
- }
- else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
- {
- addedLatch.countDown();
- }
- }
- }
- );
- cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-
- client.create().forPath("/test/foo", "first".getBytes());
- Assert.assertTrue(timing.awaitLatch(addedLatch));
-
- client.setData().forPath("/test/foo", "something new".getBytes());
- Assert.assertTrue(timing.awaitLatch(updatedLatch));
- }
- finally
- {
- CloseableUtils.closeQuietly(cache);
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- //@Test
- public void testEnsurePath() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
- try
- {
- try ( PathChildrenCache cache = new PathChildrenCache(client, "/one/two/three", false) )
- {
- cache.start();
- timing.sleepABit();
-
- try
- {
- client.create().forPath("/one/two/three/four");
- }
- catch ( KeeperException.NoNodeException e )
- {
- Assert.fail("Path should exist", e);
- }
- }
- timing.sleepABit();
- }
- finally
- {
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- //@Test
- public void testDeleteThenCreate() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
- try
- {
- client.create().forPath("/test");
- client.create().forPath("/test/foo", "one".getBytes());
-
- final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
- client.getUnhandledErrorListenable().addListener
- (
- new UnhandledErrorListener()
- {
- @Override
- public void unhandledError(String message, Throwable e)
- {
- error.set(e);
- }
- }
- );
-
- final CountDownLatch removedLatch = new CountDownLatch(1);
- final CountDownLatch postRemovedLatch = new CountDownLatch(1);
- final CountDownLatch dataLatch = new CountDownLatch(1);
- try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true) )
- {
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
- {
- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
- {
- removedLatch.countDown();
- Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS));
- }
- else
- {
- try
- {
- Assert.assertEquals(event.getData().getData(), "two".getBytes());
- }
- finally
- {
- dataLatch.countDown();
- }
- }
- }
- }
- );
- cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-
- client.delete().forPath("/test/foo");
- Assert.assertTrue(timing.awaitLatch(removedLatch));
- client.create().forPath("/test/foo", "two".getBytes());
- postRemovedLatch.countDown();
- Assert.assertTrue(timing.awaitLatch(dataLatch));
-
- Throwable t = error.get();
- if ( t != null )
- {
- Assert.fail("Assert", t);
- }
- }
- }
- finally
- {
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- //@Test
- public void testRebuildAgainstOtherProcesses() throws Exception
- {
- final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
- try
- {
- client.create().forPath("/test");
- client.create().forPath("/test/foo");
- client.create().forPath("/test/bar");
- client.create().forPath("/test/snafu", "original".getBytes());
-
- final CountDownLatch addedLatch = new CountDownLatch(2);
- try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", true) )
- {
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
- {
- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
- {
- if ( event.getData().getPath().equals("/test/test") )
- {
- addedLatch.countDown();
- }
- }
- else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
- {
- if ( event.getData().getPath().equals("/test/snafu") )
- {
- addedLatch.countDown();
- }
- }
- }
- }
- );
- cache.rebuildTestExchanger = new Exchanger<Object>();
- ExecutorService service = Executors.newSingleThreadExecutor();
- final AtomicReference<String> deletedPath = new AtomicReference<String>();
- Future<Object> future = service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
- {
- cache.rebuildTestExchanger.exchange(new Object());
-
- // simulate another process adding a node while we're rebuilding
- client.create().forPath("/test/test");
-
- List<ChildData> currentData = cache.getCurrentData();
- Assert.assertTrue(currentData.size() > 0);
-
- // simulate another process removing a node while we're rebuilding
- client.delete().forPath(currentData.get(0).getPath());
- deletedPath.set(currentData.get(0).getPath());
-
- cache.rebuildTestExchanger.exchange(new Object());
-
- ChildData childData = null;
- while ( childData == null )
- {
- childData = cache.getCurrentData("/test/snafu");
- Thread.sleep(1000);
- }
- Assert.assertEquals(childData.getData(), "original".getBytes());
- client.setData().forPath("/test/snafu", "grilled".getBytes());
-
- cache.rebuildTestExchanger.exchange(new Object());
-
- return null;
- }
- }
- );
- cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
- future.get();
-
- Assert.assertTrue(timing.awaitLatch(addedLatch));
- Assert.assertNotNull(cache.getCurrentData("/test/test"));
- Assert.assertNull(cache.getCurrentData(deletedPath.get()));
- Assert.assertEquals(cache.getCurrentData("/test/snafu").getData(), "grilled".getBytes());
- }
- }
- finally
- {
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- // see https://github.com/Netflix/curator/issues/27 - was caused by not comparing old->new data
- //@Test
- public void testIssue27() throws Exception
- {
- PathChildrenCache cache = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
- try
- {
- client.create().forPath("/base");
- client.create().forPath("/base/a");
- client.create().forPath("/base/b");
- client.create().forPath("/base/c");
-
- client.getChildren().forPath("/base");
-
- final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
- final Semaphore semaphore = new Semaphore(0);
- cache = new PathChildrenCache(client, "/base", true);
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
- {
- events.add(event.getType());
- semaphore.release();
- }
- }
- );
- cache.start();
-
- Assert.assertTrue(timing.acquireSemaphore(semaphore, 3));
-
- client.delete().forPath("/base/a");
- Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
-
- client.create().forPath("/base/a");
- Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
-
- List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList
- (
- PathChildrenCacheEvent.Type.CHILD_ADDED,
- PathChildrenCacheEvent.Type.CHILD_ADDED,
- PathChildrenCacheEvent.Type.CHILD_ADDED,
- PathChildrenCacheEvent.Type.CHILD_REMOVED,
- PathChildrenCacheEvent.Type.CHILD_ADDED
- );
- Assert.assertEquals(expected, events);
- }
- finally
- {
- CloseableUtils.closeQuietly(cache);
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- // test Issue 27 using new rebuild() method
- //@Test
- public void testIssue27Alt() throws Exception
- {
- PathChildrenCache cache = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
- try
- {
- client.create().forPath("/base");
- client.create().forPath("/base/a");
- client.create().forPath("/base/b");
- client.create().forPath("/base/c");
-
- client.getChildren().forPath("/base");
-
- final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
- final Semaphore semaphore = new Semaphore(0);
- cache = new PathChildrenCache(client, "/base", true);
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
- {
- events.add(event.getType());
- semaphore.release();
- }
- }
- );
- cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-
- client.delete().forPath("/base/a");
- Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
-
- client.create().forPath("/base/a");
- Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
-
- List<PathChildrenCacheEvent.Type> expected = Lists.newArrayList
- (
- PathChildrenCacheEvent.Type.CHILD_REMOVED,
- PathChildrenCacheEvent.Type.CHILD_ADDED
- );
- Assert.assertEquals(expected, events);
- }
- finally
- {
- CloseableUtils.closeQuietly(cache);
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- //@Test
- public void testKilledSession() throws Exception
- {
- PathChildrenCache cache = null;
- CuratorFramework client = null;
- try
- {
- client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
- client.create().forPath("/test");
-
- cache = new PathChildrenCache(client, "/test", true);
- cache.start();
-
- final CountDownLatch childAddedLatch = new CountDownLatch(1);
- final CountDownLatch lostLatch = new CountDownLatch(1);
- final CountDownLatch reconnectedLatch = new CountDownLatch(1);
- final CountDownLatch removedLatch = new CountDownLatch(1);
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
- {
- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
- {
- childAddedLatch.countDown();
- }
- else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST )
- {
- lostLatch.countDown();
- }
- else if ( event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED )
- {
- reconnectedLatch.countDown();
- }
- else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
- {
- removedLatch.countDown();
- }
- }
- }
- );
-
- client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
- Assert.assertTrue(timing.awaitLatch(childAddedLatch));
-
- KillSession.kill(client.getZookeeperClient().getZooKeeper());
- Assert.assertTrue(timing.awaitLatch(lostLatch));
- Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
- Assert.assertTrue(timing.awaitLatch(removedLatch));
- }
- finally
- {
- CloseableUtils.closeQuietly(cache);
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- //@Test
- public void testModes() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
- try
- {
- client.create().forPath("/test");
-
- for ( boolean cacheData : new boolean[]{false, true} )
- {
- internalTestMode(client, cacheData);
-
- client.delete().forPath("/test/one");
- client.delete().forPath("/test/two");
- }
- }
- finally
- {
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- //@Test
- public void testRebuildNode() throws Exception
- {
- PathChildrenCache cache = null;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- try
- {
- client.start();
- client.create().creatingParentsIfNeeded().forPath("/test/one", "one".getBytes());
-
- final CountDownLatch latch = new CountDownLatch(1);
- final AtomicInteger counter = new AtomicInteger();
- final Semaphore semaphore = new Semaphore(1);
- cache = new PathChildrenCache(client, "/test", true)
- {
- @Override
- void getDataAndStat(String fullPath) throws Exception
- {
- semaphore.acquire();
- counter.incrementAndGet();
- super.getDataAndStat(fullPath);
- latch.countDown();
- }
- };
- cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-
- Assert.assertTrue(timing.awaitLatch(latch));
-
- int saveCounter = counter.get();
- client.setData().forPath("/test/one", "alt".getBytes());
- cache.rebuildNode("/test/one");
- Assert.assertEquals(cache.getCurrentData("/test/one").getData(), "alt".getBytes());
- Assert.assertEquals(saveCounter, counter.get());
-
- semaphore.release(1000);
- timing.sleepABit();
- }
- finally
- {
- CloseableUtils.closeQuietly(cache);
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- private void internalTestMode(CuratorFramework client, boolean cacheData) throws Exception
- {
- try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", cacheData) )
- {
- final CountDownLatch latch = new CountDownLatch(2);
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
- {
- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
- {
- latch.countDown();
- }
- }
- }
- );
- cache.start();
-
- client.create().forPath("/test/one", "one".getBytes());
- client.create().forPath("/test/two", "two".getBytes());
- Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
-
- for ( ChildData data : cache.getCurrentData() )
- {
- if ( cacheData )
- {
- Assert.assertNotNull(data.getData());
- Assert.assertNotNull(data.getStat());
- }
- else
- {
- Assert.assertNull(data.getData());
- Assert.assertNotNull(data.getStat());
- }
- }
- }
- }
-
- @Test
- public void testBasics() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
- try
- {
- client.create().forPath("/test");
-
- final BlockingQueue<CacheEvent> events = new LinkedBlockingQueue<>();
- try ( CuratorCache cache = CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build() )
- {
- cache.getListenable().addListener
- (
- new CacheListener()
- {
- @Override
- public void process(CacheEvent event, String path)
- {
- if ( path.equals("/test/one") )
- {
- events.offer(event);
- }
- }
- }
- );
- cache.start();
-
- client.create().forPath("/test/one", "hey there".getBytes());
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_CREATED);
-
- client.setData().forPath("/test/one", "sup!".getBytes());
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_CHANGED);
- Assert.assertTrue(cache.exists("/test/one"));
- Assert.assertEquals(new String(cache.get("/test/one").getData()), "sup!");
-
- client.delete().forPath("/test/one");
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), CacheEvent.NODE_DELETED);
- }
- }
- finally
- {
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- //@Test
- public void testBasicsOnTwoCachesWithSameExecutor() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
- try
- {
- client.create().forPath("/test");
-
- final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
- final ExecutorService exec = Executors.newSingleThreadExecutor();
- try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
- {
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
- {
- if ( event.getData().getPath().equals("/test/one") )
- {
- events.offer(event.getType());
- }
- }
- }
- );
- cache.start();
-
- final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
- try ( PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec) )
- {
- cache2.getListenable().addListener(
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
- throws Exception
- {
- if ( event.getData().getPath().equals("/test/one") )
- {
- events2.offer(event.getType());
- }
- }
- }
- );
- cache2.start();
-
- client.create().forPath("/test/one", "hey there".getBytes());
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
- Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
-
- client.setData().forPath("/test/one", "sup!".getBytes());
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
- Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
- Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
- Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
-
- client.delete().forPath("/test/one");
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
- Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
- }
- }
- }
- finally
- {
- TestCleanState.closeAndTestClean(client);
- }
- }
-
- //@Test
- public void testDeleteNodeAfterCloseDoesntCallExecutor()
- throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
- try
- {
- client.create().forPath("/test");
-
- final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
- try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
- {
- cache.start();
- client.create().forPath("/test/one", "hey there".getBytes());
-
- cache.rebuild();
- Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
- Assert.assertTrue(exec.isExecuteCalled());
-
- exec.setExecuteCalled(false);
- }
- Assert.assertFalse(exec.isExecuteCalled());
-
- client.delete().forPath("/test/one");
- timing.sleepABit();
- Assert.assertFalse(exec.isExecuteCalled());
- }
- finally
- {
- TestCleanState.closeAndTestClean(client);
- }
-
- }
-
- /**
- * Tests the case where there's an outstanding operation being executed when the cache is
- * shut down. See CURATOR-121, this was causing misleading warning messages to be logged.
- */
- //@Test
- public void testInterruptedOperationOnShutdown() throws Exception
- {
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new RetryOneTime(1));
- client.start();
-
- try
- {
- final CountDownLatch latch = new CountDownLatch(1);
- try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) {
- @Override
- protected void handleException(Throwable e)
- {
- latch.countDown();
- }
- } )
- {
- cache.start();
-
- cache.offerOperation(new Operation()
- {
-
- @Override
- public void invoke() throws Exception
- {
- Thread.sleep(5000);
- }
- });
-
- Thread.sleep(1000);
-
- }
-
- latch.await(5, TimeUnit.SECONDS);
-
- Assert.assertTrue(latch.getCount() == 1, "Unexpected exception occurred");
- }
- finally
- {
- TestCleanState.closeAndTestClean(client);
- }
- }
-}