You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/05/21 17:15:25 UTC
curator git commit: PathChildrenCache was not handling CONNECTED state
Repository: curator
Updated Branches:
refs/heads/CURATOR-328 [created] 2f33fafbc
PathChildrenCache was not handling CONNECTED state
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2f33fafb
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2f33fafb
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2f33fafb
Branch: refs/heads/CURATOR-328
Commit: 2f33fafbc58303009d2f78a5ed0df715a799a3c9
Parents: 115346b
Author: randgalt <ra...@apache.org>
Authored: Sat May 21 12:15:10 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat May 21 12:15:10 2016 -0500
----------------------------------------------------------------------
.../recipes/cache/PathChildrenCache.java | 1 +
.../recipes/cache/TestPathChildrenCache.java | 166 ++++++++++++-------
2 files changed, 106 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/2f33fafb/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index ae30da9..568d03d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -640,6 +640,7 @@ public class PathChildrenCache implements Closeable
break;
}
+ case CONNECTED:
case RECONNECTED:
{
try
http://git-wip-us.apache.org/repos/asf/curator/blob/2f33fafb/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 14d061f..887df54 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -16,56 +16,96 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.recipes.cache;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.Pathable;
import org.apache.curator.framework.api.UnhandledErrorListener;
-import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
import org.apache.curator.test.KillSession;
+import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
-import org.apache.log4j.Appender;
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.SimpleLayout;
-import org.apache.log4j.spi.LoggingEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.testng.Assert;
import org.testng.annotations.Test;
-
-import java.util.Collection;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Exchanger;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import static org.testng.AssertJUnit.assertNotNull;
+
public class TestPathChildrenCache extends BaseClassForTests
{
@Test
+ public void testWithBadConnect() throws Exception
+ {
+ final int serverPort = server.getPort();
+ server.close();
+
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 1000, 1000, new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ final CountDownLatch ensurePathLatch = new CountDownLatch(1);
+ PathChildrenCache cache = new PathChildrenCache(client, "/", true)
+ {
+ @Override
+ protected void ensurePath() throws Exception
+ {
+ try
+ {
+ super.ensurePath();
+ }
+ catch ( Exception e )
+ {
+ ensurePathLatch.countDown();
+ throw e;
+ }
+ }
+ };
+ final CountDownLatch addedLatch = new CountDownLatch(1);
+ PathChildrenCacheListener listener = new PathChildrenCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ {
+ if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+ {
+ addedLatch.countDown();
+ }
+ }
+ };
+ cache.getListenable().addListener(listener);
+ cache.start();
+ Assert.assertTrue(timing.awaitLatch(ensurePathLatch));
+
+ server = new TestingServer(serverPort, true);
+
+ client.create().creatingParentContainersIfNeeded().forPath("/baz", new byte[]{1, 2, 3});
+
+ assertNotNull("/baz does not exist", client.checkExists().forPath("/baz"));
+
+ Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+ assertNotNull("cache doesn't see /baz", cache.getCurrentData("/baz"));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
public void testPostInitializedForEmpty() throws Exception
{
Timing timing = new Timing();
@@ -78,19 +118,19 @@ public class TestPathChildrenCache extends BaseClassForTests
final CountDownLatch latch = new CountDownLatch(1);
cache = new PathChildrenCache(client, "/test", true);
cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
- {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ (
+ new PathChildrenCacheListener()
{
- if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED )
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
- latch.countDown();
+ if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED )
+ {
+ latch.countDown();
+ }
}
}
- }
- );
+ );
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
Assert.assertTrue(timing.awaitLatch(latch));
}
@@ -212,20 +252,20 @@ public class TestPathChildrenCache extends BaseClassForTests
final CountDownLatch addedLatch = new CountDownLatch(3);
cache.getListenable().addListener
- (
- new PathChildrenCacheListener()
+ (
+ new PathChildrenCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ {
+ Assert.assertNotEquals(event.getType(), PathChildrenCacheEvent.Type.INITIALIZED);
+ if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
{
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
- {
- Assert.assertNotEquals(event.getType(), PathChildrenCacheEvent.Type.INITIALIZED);
- if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
- {
- addedLatch.countDown();
- }
- }
+ addedLatch.countDown();
}
- );
+ }
+ }
+ );
client.create().forPath("/test/1", "1".getBytes());
client.create().forPath("/test/2", "2".getBytes());
@@ -845,18 +885,19 @@ public class TestPathChildrenCache extends BaseClassForTests
final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec);
cache2.getListenable().addListener(
- new PathChildrenCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
- throws Exception
+ new PathChildrenCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+ throws Exception
+ {
+ if ( event.getData().getPath().equals("/test/one") )
{
- if ( event.getData().getPath().equals("/test/one") )
- {
- events2.offer(event.getType());
- }
+ events2.offer(event.getType());
}
}
- );
+ }
+ );
cache2.start();
client.create().forPath("/test/one", "hey there".getBytes());
@@ -884,7 +925,7 @@ public class TestPathChildrenCache extends BaseClassForTests
@Test
public void testDeleteNodeAfterCloseDoesntCallExecutor()
- throws Exception
+ throws Exception
{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
@@ -911,7 +952,8 @@ public class TestPathChildrenCache extends BaseClassForTests
timing.sleepABit();
Assert.assertFalse(exec.isExecuteCalled());
}
- finally {
+ finally
+ {
client.close();
}
@@ -931,7 +973,8 @@ public class TestPathChildrenCache extends BaseClassForTests
try
{
final CountDownLatch latch = new CountDownLatch(1);
- final PathChildrenCache cache = new PathChildrenCache(client, "/test", false) {
+ final PathChildrenCache cache = new PathChildrenCache(client, "/test", false)
+ {
@Override
protected void handleException(Throwable e)
{
@@ -957,7 +1000,8 @@ public class TestPathChildrenCache extends BaseClassForTests
latch.await(5, TimeUnit.SECONDS);
Assert.assertTrue(latch.getCount() == 1, "Unexpected exception occurred");
- } finally
+ }
+ finally
{
CloseableUtils.closeQuietly(client);
}