You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/10/10 23:52:02 UTC
[3/6] curator git commit: Trying to make tests more reliable
Trying to make tests more reliable
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/b25a8a35
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/b25a8a35
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/b25a8a35
Branch: refs/heads/CURATOR-3.0
Commit: b25a8a35856abf9710d42fae0a7324fbe66c362d
Parents: 967faf1
Author: randgalt <ra...@apache.org>
Authored: Sat Oct 10 15:15:50 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Oct 10 15:15:50 2015 -0500
----------------------------------------------------------------------
.../recipes/cache/PathChildrenCache.java | 6 +-
.../framework/recipes/cache/TreeCache.java | 21 +-
.../recipes/cache/TestPathChildrenCache.java | 432 +++++++++----------
3 files changed, 220 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/b25a8a35/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 99a652d..e4e18d9 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -315,7 +315,7 @@ public class PathChildrenCache implements Closeable
*/
public void rebuild() throws Exception
{
- Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
+ Preconditions.checkState(state.get() == State.STARTED, "cache has been closed");
ensurePath();
@@ -347,7 +347,7 @@ public class PathChildrenCache implements Closeable
public void rebuildNode(String fullPath) throws Exception
{
Preconditions.checkArgument(ZKPaths.getPathAndNode(fullPath).getPath().equals(path), "Node is not part of this cache: " + fullPath);
- Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
+ Preconditions.checkState(state.get() == State.STARTED, "cache has been closed");
ensurePath();
internalRebuildNode(fullPath);
@@ -370,8 +370,6 @@ public class PathChildrenCache implements Closeable
client.getConnectionStateListenable().removeListener(connectionStateListener);
listeners.clear();
executorService.close();
- client.clearWatcherReferences(childrenWatcher);
- client.clearWatcherReferences(dataWatcher);
client.removeWatchers();
// TODO
http://git-wip-us.apache.org/repos/asf/curator/blob/b25a8a35/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index bda00bf..8030e8b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -248,18 +248,24 @@ public class TreeCache implements Closeable
private void doRefreshChildren() throws Exception
{
- client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
+ if ( treeState.get() == TreeState.STARTED )
+ {
+ client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
+ }
}
private void doRefreshData() throws Exception
{
- if ( dataIsCompressed )
- {
- client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path);
- }
- else
+ if ( treeState.get() == TreeState.STARTED )
{
- client.getData().usingWatcher(this).inBackground(this).forPath(path);
+ if ( dataIsCompressed )
+ {
+ client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path);
+ }
+ else
+ {
+ client.getData().usingWatcher(this).inBackground(this).forPath(path);
+ }
}
}
@@ -285,7 +291,6 @@ public class TreeCache implements Closeable
{
stat.set(null);
data.set(null);
- client.clearWatcherReferences(this);
ConcurrentMap<String, TreeNode> childMap = children.getAndSet(null);
if ( childMap != null )
{
http://git-wip-us.apache.org/repos/asf/curator/blob/b25a8a35/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 3571ca7..a4e2b2e 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -18,15 +18,9 @@
*/
package org.apache.curator.framework.recipes.cache;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.Pathable;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.retry.RetryOneTime;
@@ -35,31 +29,12 @@ import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.log4j.Appender;
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.SimpleLayout;
-import org.apache.log4j.spi.LoggingEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.testng.Assert;
import org.testng.annotations.Test;
-
-import java.util.Collection;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Exchanger;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -348,48 +323,48 @@ public class TestPathChildrenCache extends BaseClassForTests
final CountDownLatch removedLatch = new CountDownLatch(1);
final CountDownLatch postRemovedLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
- PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true) )
+ {
+ cache.getListenable().addListener
+ (
+ new PathChildrenCacheListener()
{
- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
- {
- removedLatch.countDown();
- Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS));
- }
- else
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
- try
+ if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
{
- Assert.assertEquals(event.getData().getData(), "two".getBytes());
+ removedLatch.countDown();
+ Assert.assertTrue(postRemovedLatch.await(10, TimeUnit.SECONDS));
}
- finally
+ else
{
- dataLatch.countDown();
+ try
+ {
+ Assert.assertEquals(event.getData().getData(), "two".getBytes());
+ }
+ finally
+ {
+ dataLatch.countDown();
+ }
}
}
}
- }
- );
- cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+ );
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
- client.delete().forPath("/test/foo");
- Assert.assertTrue(timing.awaitLatch(removedLatch));
- client.create().forPath("/test/foo", "two".getBytes());
- postRemovedLatch.countDown();
- Assert.assertTrue(timing.awaitLatch(dataLatch));
+ client.delete().forPath("/test/foo");
+ Assert.assertTrue(timing.awaitLatch(removedLatch));
+ client.create().forPath("/test/foo", "two".getBytes());
+ postRemovedLatch.countDown();
+ Assert.assertTrue(timing.awaitLatch(dataLatch));
- Throwable t = error.get();
- if ( t != null )
- {
- Assert.fail("Assert", t);
+ Throwable t = error.get();
+ if ( t != null )
+ {
+ Assert.fail("Assert", t);
+ }
}
-
- cache.close();
}
finally
{
@@ -411,79 +386,79 @@ public class TestPathChildrenCache extends BaseClassForTests
client.create().forPath("/test/snafu", "original".getBytes());
final CountDownLatch addedLatch = new CountDownLatch(2);
- final PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", true) )
+ {
+ cache.getListenable().addListener
+ (
+ new PathChildrenCacheListener()
{
- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
- if ( event.getData().getPath().equals("/test/test") )
+ if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
{
- addedLatch.countDown();
+ if ( event.getData().getPath().equals("/test/test") )
+ {
+ addedLatch.countDown();
+ }
}
- }
- else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
- {
- if ( event.getData().getPath().equals("/test/snafu") )
+ else if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED )
{
- addedLatch.countDown();
+ if ( event.getData().getPath().equals("/test/snafu") )
+ {
+ addedLatch.countDown();
+ }
}
}
}
- }
- );
- cache.rebuildTestExchanger = new Exchanger<Object>();
- ExecutorService service = Executors.newSingleThreadExecutor();
- final AtomicReference<String> deletedPath = new AtomicReference<String>();
- Future<Object> future = service.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ );
+ cache.rebuildTestExchanger = new Exchanger<Object>();
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ final AtomicReference<String> deletedPath = new AtomicReference<String>();
+ Future<Object> future = service.submit
+ (
+ new Callable<Object>()
{
- cache.rebuildTestExchanger.exchange(new Object());
+ @Override
+ public Object call() throws Exception
+ {
+ cache.rebuildTestExchanger.exchange(new Object());
- // simulate another process adding a node while we're rebuilding
- client.create().forPath("/test/test");
+ // simulate another process adding a node while we're rebuilding
+ client.create().forPath("/test/test");
- List<ChildData> currentData = cache.getCurrentData();
- Assert.assertTrue(currentData.size() > 0);
+ List<ChildData> currentData = cache.getCurrentData();
+ Assert.assertTrue(currentData.size() > 0);
- // simulate another process removing a node while we're rebuilding
- client.delete().forPath(currentData.get(0).getPath());
- deletedPath.set(currentData.get(0).getPath());
+ // simulate another process removing a node while we're rebuilding
+ client.delete().forPath(currentData.get(0).getPath());
+ deletedPath.set(currentData.get(0).getPath());
- cache.rebuildTestExchanger.exchange(new Object());
+ cache.rebuildTestExchanger.exchange(new Object());
- ChildData childData = null;
- while ( childData == null )
- {
- childData = cache.getCurrentData("/test/snafu");
- Thread.sleep(1000);
- }
- Assert.assertEquals(childData.getData(), "original".getBytes());
- client.setData().forPath("/test/snafu", "grilled".getBytes());
+ ChildData childData = null;
+ while ( childData == null )
+ {
+ childData = cache.getCurrentData("/test/snafu");
+ Thread.sleep(1000);
+ }
+ Assert.assertEquals(childData.getData(), "original".getBytes());
+ client.setData().forPath("/test/snafu", "grilled".getBytes());
- cache.rebuildTestExchanger.exchange(new Object());
+ cache.rebuildTestExchanger.exchange(new Object());
- return null;
+ return null;
+ }
}
- }
- );
- cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
- future.get();
-
- Assert.assertTrue(timing.awaitLatch(addedLatch));
- Assert.assertNotNull(cache.getCurrentData("/test/test"));
- Assert.assertNull(cache.getCurrentData(deletedPath.get()));
- Assert.assertEquals(cache.getCurrentData("/test/snafu").getData(), "grilled".getBytes());
+ );
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+ future.get();
- cache.close();
+ Assert.assertTrue(timing.awaitLatch(addedLatch));
+ Assert.assertNotNull(cache.getCurrentData("/test/test"));
+ Assert.assertNull(cache.getCurrentData(deletedPath.get()));
+ Assert.assertEquals(cache.getCurrentData("/test/snafu").getData(), "grilled".getBytes());
+ }
}
finally
{
@@ -653,7 +628,7 @@ public class TestPathChildrenCache extends BaseClassForTests
client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", "data".getBytes());
Assert.assertTrue(timing.awaitLatch(childAddedLatch));
- KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+ KillSession.kill(client.getZookeeperClient().getZooKeeper());
Assert.assertTrue(timing.awaitLatch(lostLatch));
Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
Assert.assertTrue(timing.awaitLatch(removedLatch));
@@ -695,9 +670,9 @@ public class TestPathChildrenCache extends BaseClassForTests
Timing timing = new Timing();
PathChildrenCache cache = null;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- client.start();
try
{
+ client.start();
client.create().creatingParentsIfNeeded().forPath("/test/one", "one".getBytes());
final CountDownLatch latch = new CountDownLatch(1);
@@ -716,7 +691,7 @@ public class TestPathChildrenCache extends BaseClassForTests
};
cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
- latch.await();
+ Assert.assertTrue(timing.awaitLatch(latch));
int saveCounter = counter.get();
client.setData().forPath("/test/one", "alt".getBytes());
@@ -725,6 +700,7 @@ public class TestPathChildrenCache extends BaseClassForTests
Assert.assertEquals(saveCounter, counter.get());
semaphore.release(1000);
+ timing.sleepABit();
}
finally
{
@@ -735,44 +711,43 @@ public class TestPathChildrenCache extends BaseClassForTests
private void internalTestMode(CuratorFramework client, boolean cacheData) throws Exception
{
- PathChildrenCache cache = new PathChildrenCache(client, "/test", cacheData);
-
- final CountDownLatch latch = new CountDownLatch(2);
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", cacheData) )
+ {
+ final CountDownLatch latch = new CountDownLatch(2);
+ cache.getListenable().addListener
+ (
+ new PathChildrenCacheListener()
{
- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
- latch.countDown();
+ if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+ {
+ latch.countDown();
+ }
}
}
- }
- );
- cache.start();
+ );
+ cache.start();
- client.create().forPath("/test/one", "one".getBytes());
- client.create().forPath("/test/two", "two".getBytes());
- Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+ client.create().forPath("/test/one", "one".getBytes());
+ client.create().forPath("/test/two", "two".getBytes());
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
- for ( ChildData data : cache.getCurrentData() )
- {
- if ( cacheData )
- {
- Assert.assertNotNull(data.getData());
- Assert.assertNotNull(data.getStat());
- }
- else
+ for ( ChildData data : cache.getCurrentData() )
{
- Assert.assertNull(data.getData());
- Assert.assertNotNull(data.getStat());
+ if ( cacheData )
+ {
+ Assert.assertNotNull(data.getData());
+ Assert.assertNotNull(data.getStat());
+ }
+ else
+ {
+ Assert.assertNull(data.getData());
+ Assert.assertNotNull(data.getStat());
+ }
}
}
-
- cache.close();
}
@Test
@@ -786,34 +761,34 @@ public class TestPathChildrenCache extends BaseClassForTests
client.create().forPath("/test");
final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
- PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true) )
+ {
+ cache.getListenable().addListener
+ (
+ new PathChildrenCacheListener()
{
- if ( event.getData().getPath().equals("/test/one") )
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
- events.offer(event.getType());
+ if ( event.getData().getPath().equals("/test/one") )
+ {
+ events.offer(event.getType());
+ }
}
}
- }
- );
- cache.start();
-
- client.create().forPath("/test/one", "hey there".getBytes());
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+ );
+ cache.start();
- client.setData().forPath("/test/one", "sup!".getBytes());
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
- Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
+ client.create().forPath("/test/one", "hey there".getBytes());
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
- client.delete().forPath("/test/one");
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+ client.setData().forPath("/test/one", "sup!".getBytes());
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+ Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
- cache.close();
+ client.delete().forPath("/test/one");
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+ }
}
finally
{
@@ -833,56 +808,58 @@ public class TestPathChildrenCache extends BaseClassForTests
final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
final ExecutorService exec = Executors.newSingleThreadExecutor();
- PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec);
- cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
+ {
+ cache.getListenable().addListener
+ (
+ new PathChildrenCacheListener()
{
- if ( event.getData().getPath().equals("/test/one") )
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
- events.offer(event.getType());
+ if ( event.getData().getPath().equals("/test/one") )
+ {
+ events.offer(event.getType());
+ }
}
}
- }
- );
- cache.start();
+ );
+ cache.start();
- final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
- PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec);
- cache2.getListenable().addListener(
- new PathChildrenCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
- throws Exception
+ final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+ try ( PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec) )
+ {
+ cache2.getListenable().addListener(
+ new PathChildrenCacheListener()
{
- if ( event.getData().getPath().equals("/test/one") )
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+ throws Exception
{
- events2.offer(event.getType());
+ if ( event.getData().getPath().equals("/test/one") )
+ {
+ events2.offer(event.getType());
+ }
}
}
- }
- );
- cache2.start();
-
- client.create().forPath("/test/one", "hey there".getBytes());
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
- Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
-
- client.setData().forPath("/test/one", "sup!".getBytes());
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
- Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
- Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
- Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
-
- client.delete().forPath("/test/one");
- Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
- Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
-
- cache.close();
- cache2.close();
+ );
+ cache2.start();
+
+ client.create().forPath("/test/one", "hey there".getBytes());
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+ Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+ client.setData().forPath("/test/one", "sup!".getBytes());
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+ Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+ Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
+ Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
+
+ client.delete().forPath("/test/one");
+ Assert.assertEquals(events.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+ Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+ }
+ }
}
finally
{
@@ -902,17 +879,17 @@ public class TestPathChildrenCache extends BaseClassForTests
client.create().forPath("/test");
final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
- PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec);
-
- cache.start();
- client.create().forPath("/test/one", "hey there".getBytes());
+ try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec) )
+ {
+ cache.start();
+ client.create().forPath("/test/one", "hey there".getBytes());
- cache.rebuild();
- Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
- Assert.assertTrue(exec.isExecuteCalled());
+ cache.rebuild();
+ Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
+ Assert.assertTrue(exec.isExecuteCalled());
- exec.setExecuteCalled(false);
- cache.close();
+ exec.setExecuteCalled(false);
+ }
Assert.assertFalse(exec.isExecuteCalled());
client.delete().forPath("/test/one");
@@ -940,28 +917,29 @@ public class TestPathChildrenCache extends BaseClassForTests
try
{
final CountDownLatch latch = new CountDownLatch(1);
- final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) {
+ try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) {
@Override
protected void handleException(Throwable e)
{
latch.countDown();
}
- };
- cache.start();
-
- cache.offerOperation(new Operation()
+ } )
{
+ cache.start();
- @Override
- public void invoke() throws Exception
+ cache.offerOperation(new Operation()
{
- Thread.sleep(5000);
- }
- });
- Thread.sleep(1000);
+ @Override
+ public void invoke() throws Exception
+ {
+ Thread.sleep(5000);
+ }
+ });
- cache.close();
+ Thread.sleep(1000);
+
+ }
latch.await(5, TimeUnit.SECONDS);