You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2016/05/19 20:26:10 UTC

curator git commit: test for event ordering in both caches

Repository: curator
Updated Branches:
  refs/heads/CURATOR-324 [created] ee34e2b77


test for event ordering in both caches


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ee34e2b7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ee34e2b7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ee34e2b7

Branch: refs/heads/CURATOR-324
Commit: ee34e2b77225b62432d73d47d648763747c165d3
Parents: 168dfd7
Author: randgalt <ra...@apache.org>
Authored: Thu May 19 15:25:57 2016 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu May 19 15:25:57 2016 -0500

----------------------------------------------------------------------
 .../recipes/cache/TestEventOrdering.java        | 160 +++++++++++++++++++
 .../TestPathChildrenCacheEventOrdering.java     |  37 +++++
 .../cache/TestTreeCacheEventOrdering.java       |  40 +++++
 3 files changed, 237 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/ee34e2b7/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestEventOrdering.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestEventOrdering.java
new file mode 100644
index 0000000..28a1655
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestEventOrdering.java
@@ -0,0 +1,160 @@
+package org.apache.curator.framework.recipes.cache;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.Closeable;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public abstract class TestEventOrdering<T extends Closeable> extends BaseClassForTests
+{
+    private final Timing timing = new Timing();
+    private final long start = System.currentTimeMillis();
+    private static final int THREAD_QTY = 100;
+    private static final int ITERATIONS = 100;
+    private static final int NODE_QTY = 10;
+
+    public enum EventType
+    {
+        ADDED,
+        DELETED
+    }
+
+    public static class Event
+    {
+        public final EventType eventType;
+        public final String path;
+        public final long time = System.currentTimeMillis();
+
+        public Event(EventType eventType, String path)
+        {
+            this.eventType = eventType;
+            this.path = path;
+        }
+    }
+
+    @Test
+    public void testEventOrdering() throws Exception
+    {
+        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_QTY);
+        BlockingQueue<Event> events = Queues.newLinkedBlockingQueue();
+        final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        T cache = null;
+        try
+        {
+            client.start();
+            client.create().forPath("/root");
+            cache = newCache(client, "/root", events);
+
+            final Random random = new Random();
+            final Callable<Void> task = new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
+                {
+                    for ( int i = 0; i < ITERATIONS; ++i )
+                    {
+                        String node = "/root/" + random.nextInt(NODE_QTY);
+                        try
+                        {
+                            switch ( random.nextInt(3) )
+                            {
+                            default:
+                            case 0:
+                                client.create().forPath(node);
+                                break;
+
+                            case 1:
+                                client.setData().forPath(node, "new".getBytes());
+                                break;
+
+                            case 2:
+                                client.delete().forPath(node);
+                                break;
+                            }
+                        }
+                        catch ( KeeperException ignore )
+                        {
+                            // ignore
+                        }
+                    }
+                    return null;
+                }
+            };
+
+            final CountDownLatch latch = new CountDownLatch(THREAD_QTY);
+            for ( int i = 0; i < THREAD_QTY; ++i )
+            {
+                Callable<Void> wrapped = new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
+                    {
+                        try
+                        {
+                            return task.call();
+                        }
+                        finally
+                        {
+                            latch.countDown();
+                        }
+                    }
+                };
+                executorService.submit(wrapped);
+            }
+            Assert.assertTrue(timing.awaitLatch(latch));
+
+            timing.sleepABit();
+
+            List<Event> localEvents = Lists.newArrayList();
+            int eventSuggestedQty = 0;
+            while ( events.size() > 0 )
+            {
+                Event event = events.take();
+                localEvents.add(event);
+                eventSuggestedQty += (event.eventType == EventType.ADDED) ? 1 : -1;
+            }
+            int actualQty = getActualQty(cache);
+            Assert.assertEquals(actualQty, eventSuggestedQty, String.format("actual %s expected %s:\n %s", actualQty, eventSuggestedQty, asString(localEvents)));
+        }
+        finally
+        {
+            executorService.shutdownNow();
+            //noinspection ThrowFromFinallyBlock
+            executorService.awaitTermination(timing.milliseconds(), TimeUnit.MILLISECONDS);
+            CloseableUtils.closeQuietly(cache);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    protected abstract int getActualQty(T cache);
+
+    protected abstract T newCache(CuratorFramework client, String path, BlockingQueue<Event> events) throws Exception;
+
+    private String asString(List<Event> events)
+    {
+        StringBuilder str = new StringBuilder();
+        for ( Event event : events )
+        {
+            str.append(event.eventType).append(" ").append(event.path).append(" @ ").append(event.time - start);
+            str.append("\n");
+        }
+        return str.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/ee34e2b7/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheEventOrdering.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheEventOrdering.java
new file mode 100644
index 0000000..a4cb882
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheEventOrdering.java
@@ -0,0 +1,37 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.concurrent.BlockingQueue;
+
+public class TestPathChildrenCacheEventOrdering extends TestEventOrdering<PathChildrenCache>
+{
+    @Override
+    protected int getActualQty(PathChildrenCache cache)
+    {
+        return cache.getCurrentData().size();
+    }
+
+    @Override
+    protected PathChildrenCache newCache(CuratorFramework client, String path, final BlockingQueue<Event> events) throws Exception
+    {
+        PathChildrenCache cache = new PathChildrenCache(client, path, false);
+        PathChildrenCacheListener listener = new PathChildrenCacheListener()
+        {
+            @Override
+            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+            {
+                if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+                {
+                    events.add(new Event(EventType.ADDED, event.getData().getPath()));
+                }
+                if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED )
+                {
+                    events.add(new Event(EventType.DELETED, event.getData().getPath()));
+                }
+            }
+        };
+        cache.getListenable().addListener(listener);
+        cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+        return cache;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/ee34e2b7/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheEventOrdering.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheEventOrdering.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheEventOrdering.java
new file mode 100644
index 0000000..4a306d6
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCacheEventOrdering.java
@@ -0,0 +1,40 @@
+package org.apache.curator.framework.recipes.cache;
+
+import org.apache.curator.framework.CuratorFramework;
+import java.util.concurrent.BlockingQueue;
+
+public class TestTreeCacheEventOrdering extends TestEventOrdering<TreeCache>
+{
+    @Override
+    protected int getActualQty(TreeCache cache)
+    {
+        return cache.getCurrentChildren("/root").size();
+    }
+
+    @Override
+    protected TreeCache newCache(CuratorFramework client, String path, final BlockingQueue<Event> events) throws Exception
+    {
+        TreeCache cache = new TreeCache(client, path);
+        TreeCacheListener listener = new TreeCacheListener()
+        {
+            @Override
+            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
+            {
+                if ( (event.getData() != null) && (event.getData().getPath().startsWith("/root/")) )
+                {
+                    if ( event.getType() == TreeCacheEvent.Type.NODE_ADDED )
+                    {
+                        events.add(new Event(EventType.ADDED, event.getData().getPath()));
+                    }
+                    if ( event.getType() == TreeCacheEvent.Type.NODE_REMOVED )
+                    {
+                        events.add(new Event(EventType.DELETED, event.getData().getPath()));
+                    }
+                }
+            }
+        };
+        cache.getListenable().addListener(listener);
+        cache.start();
+        return cache;
+    }
+}