You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/05/19 20:26:10 UTC
curator git commit: test for event ordering in both caches
Repository: curator
Updated Branches:
refs/heads/CURATOR-324 [created] ee34e2b77
test for event ordering in both caches
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ee34e2b7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ee34e2b7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ee34e2b7
Branch: refs/heads/CURATOR-324
Commit: ee34e2b77225b62432d73d47d648763747c165d3
Parents: 168dfd7
Author: randgalt <ra...@apache.org>
Authored: Thu May 19 15:25:57 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu May 19 15:25:57 2016 -0500
----------------------------------------------------------------------
.../recipes/cache/TestEventOrdering.java | 160 +++++++++++++++++++
.../TestPathChildrenCacheEventOrdering.java | 37 +++++
.../cache/TestTreeCacheEventOrdering.java | 40 +++++
3 files changed, 237 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/ee34e2b7/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestEventOrdering.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestEventOrdering.java
new file mode 100644
index 0000000..28a1655
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestEventOrdering.java
@@ -0,0 +1,160 @@
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public abstract class TestEventOrdering<T extends Closeable> extends BaseClassForTests
+{
+ private final Timing timing = new Timing();
+ private final long start = System.currentTimeMillis();
+ private static final int THREAD_QTY = 100;
+ private static final int ITERATIONS = 100;
+ private static final int NODE_QTY = 10;
+
+ public enum EventType
+ {
+ ADDED,
+ DELETED
+ }
+
+ public static class Event
+ {
+ public final EventType eventType;
+ public final String path;
+ public final long time = System.currentTimeMillis();
+
+ public Event(EventType eventType, String path)
+ {
+ this.eventType = eventType;
+ this.path = path;
+ }
+ }
+
+ @Test
+ public void testEventOrdering() throws Exception
+ {
+ ExecutorService executorService = Executors.newFixedThreadPool(THREAD_QTY);
+ BlockingQueue<Event> events = Queues.newLinkedBlockingQueue();
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ T cache = null;
+ try
+ {
+ client.start();
+ client.create().forPath("/root");
+ cache = newCache(client, "/root", events);
+
+ final Random random = new Random();
+ final Callable<Void> task = new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ for ( int i = 0; i < ITERATIONS; ++i )
+ {
+ String node = "/root/" + random.nextInt(NODE_QTY);
+ try
+ {
+ switch ( random.nextInt(3) )
+ {
+ default:
+ case 0:
+ client.create().forPath(node);
+ break;
+
+ case 1:
+ client.setData().forPath(node, "new".getBytes());
+ break;
+
+ case 2:
+ client.delete().forPath(node);
+ break;
+ }
+ }
+ catch ( KeeperException ignore )
+ {
+ // ignore
+ }
+ }
+ return null;
+ }
+ };
+
+ final CountDownLatch latch = new CountDownLatch(THREAD_QTY);
+ for ( int i = 0; i < THREAD_QTY; ++i )
+ {
+ Callable<Void> wrapped = new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ return task.call();
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ }
+ };
+ executorService.submit(wrapped);
+ }
+ Assert.assertTrue(timing.awaitLatch(latch));
+
+ timing.sleepABit();
+
+ List<Event> localEvents = Lists.newArrayList();
+ int eventSuggestedQty = 0;
+ while ( events.size() > 0 )
+ {
+ Event event = events.take();
+ localEvents.add(event);
+ eventSuggestedQty += (event.eventType == EventType.ADDED) ? 1 : -1;
+ }
+ int actualQty = getActualQty(cache);
+ Assert.assertEquals(actualQty, eventSuggestedQty, String.format("actual %s expected %s:\n %s", actualQty, eventSuggestedQty, asString(localEvents)));
+ }
+ finally
+ {
+ executorService.shutdownNow();
+ //noinspection ThrowFromFinallyBlock
+ executorService.awaitTermination(timing.milliseconds(), TimeUnit.MILLISECONDS);
+ CloseableUtils.closeQuietly(cache);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ protected abstract int getActualQty(T cache);
+
+ protected abstract T newCache(CuratorFramework client, String path, BlockingQueue<Event> events) throws Exception;
+
+ private String asString(List<Event> events)
+ {
+ StringBuilder str = new StringBuilder();
+ for ( Event event : events )
+ {
+ str.append(event.eventType).append(" ").append(event.path).append(" @ ").append(event.time - start);
+ str.append("\n");
+ }
+ return str.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/ee34e2b7/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheEventOrdering.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheEventOrdering.java
new file mode 100644
index 0000000..a4cb882
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheEventOrdering.java
@@ -0,0 +1,37 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.concurrent.BlockingQueue;
+
+public class TestPathChildrenCacheEventOrdering extends TestEventOrdering<PathChildrenCache>
+{
+ @Override
+ protected int getActualQty(PathChildrenCache cache)
+ {
+ return cache.getCurrentData().size();
+ }
+
+ @Override
+ protected PathChildrenCache newCache(CuratorFramework client, String path, final BlockingQueue<Event> events) throws Exception
+ {
+ PathChildrenCache cache = new PathChildrenCache(client, path, false);
+ PathChildrenCacheListener listener = new PathChildrenCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ {
+ if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+ {
+ events.add(new Event(EventType.ADDED, event.getData().getPath()));
+ }
+ if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
+ {
+ events.add(new Event(EventType.DELETED, event.getData().getPath()));
+ }
+ }
+ };
+ cache.getListenable().addListener(listener);
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+ return cache;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/ee34e2b7/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheEventOrdering.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheEventOrdering.java
new file mode 100644
index 0000000..4a306d6
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheEventOrdering.java
@@ -0,0 +1,40 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.concurrent.BlockingQueue;
+
+public class TestTreeCacheEventOrdering extends TestEventOrdering<TreeCache>
+{
+ @Override
+ protected int getActualQty(TreeCache cache)
+ {
+ return cache.getCurrentChildren("/root").size();
+ }
+
+ @Override
+ protected TreeCache newCache(CuratorFramework client, String path, final BlockingQueue<Event> events) throws Exception
+ {
+ TreeCache cache = new TreeCache(client, path);
+ TreeCacheListener listener = new TreeCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
+ {
+ if ( (event.getData() != null) && (event.getData().getPath().startsWith("/root/")) )
+ {
+ if ( event.getType() == TreeCacheEvent.Type.NODE_ADDED )
+ {
+ events.add(new Event(EventType.ADDED, event.getData().getPath()));
+ }
+ if ( event.getType() == TreeCacheEvent.Type.NODE_REMOVED )
+ {
+ events.add(new Event(EventType.DELETED, event.getData().getPath()));
+ }
+ }
+ }
+ };
+ cache.getListenable().addListener(listener);
+ cache.start();
+ return cache;
+ }
+}