You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/05/05 16:20:42 UTC

git commit: CURATOR-21 PathChildrenCache currently takes an Executor, but only to fire off a Runnable that does a blocking while loop waiting for work. This means that you must have one thread per PathChildrenCache, which is not that great.

Updated Branches:
  refs/heads/CURATOR-21 [created] 75b540414


CURATOR-21
PathChildrenCache currently takes an Executor, but only to fire off a Runnable that does a blocking while loop waiting for work. This means that you must have one thread per PathChildrenCache, which is not that great.

PathChildrenCache should just use the Executor's work queuing mechanism to enqueue work items instead of maintaining its own work queue mechanism.


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/75b54041
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/75b54041
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/75b54041

Branch: refs/heads/CURATOR-21
Commit: 75b540414f860446f106a1cdbdea121046be2e98
Parents: 97cda39
Author: randgalt <ra...@apache.org>
Authored: Sun May 5 07:20:02 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Sun May 5 07:20:02 2013 -0700

----------------------------------------------------------------------
 .../apache/curator/framework/CuratorFramework.java |    3 +
 .../framework/recipes/cache/PathChildrenCache.java |  132 +++++----
 .../recipes/cache/PathChildrenCacheEvent.java      |   21 ++
 .../recipes/cache/TestPathChildrenCache.java       |  251 +++++++++++++++
 4 files changed, 350 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/75b54041/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 7938b2e..0ca9b23 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -74,6 +74,9 @@ public interface CuratorFramework extends Closeable
     /**
      * Start an exists builder
      *
+     * The builder will return a Stat object as if org.apache.zookeeper.ZooKeeper.exists() were called.  Thus, a null
+     * means that it does not exist and an actual Stat object means it does exist.
+     *
      * @return builder object
      */
     public ExistsBuilder checkExists();

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/75b54041/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 9b25001..76efc6c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -46,12 +47,10 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Exchanger;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -74,12 +73,20 @@ public class PathChildrenCache implements Closeable
     private final boolean cacheData;
     private final boolean dataIsCompressed;
     private final EnsurePath ensurePath;
-    private final BlockingQueue<Operation> operations = new LinkedBlockingQueue<Operation>();
     private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
     private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();
     private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
+    private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
 
-    private static final ChildData      NULL_CHILD_DATA = new ChildData(null, null, null);
+    private static final ChildData NULL_CHILD_DATA = new ChildData(null, null, null);
 
     private final Watcher childrenWatcher = new Watcher()
     {
@@ -217,8 +224,8 @@ public class PathChildrenCache implements Closeable
      * @param buildInitial if true, {@link #rebuild()} will be called before this method
      *                     returns in order to get an initial view of the node; otherwise,
      *                     the cache will be initialized asynchronously
-     * @deprecated use {@link #start(StartMode)}
      * @throws Exception errors
+     * @deprecated use {@link #start(StartMode)}
      */
     public void start(boolean buildInitial) throws Exception
     {
@@ -257,21 +264,10 @@ public class PathChildrenCache implements Closeable
      */
     public void start(StartMode mode) throws Exception
     {
-        Preconditions.checkState(!executorService.isShutdown(), "already started");
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
         mode = Preconditions.checkNotNull(mode, "mode cannot be null");
 
         client.getConnectionStateListenable().addListener(connectionStateListener);
-        executorService.execute
-            (
-                new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        mainLoop();
-                    }
-                }
-            );
 
         switch ( mode )
         {
@@ -354,10 +350,11 @@ public class PathChildrenCache implements Closeable
     @Override
     public void close() throws IOException
     {
-        Preconditions.checkState(!executorService.isShutdown(), "has not been started");
-
-        client.getConnectionStateListenable().removeListener(connectionStateListener);
-        executorService.shutdownNow();
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            client.getConnectionStateListenable().removeListener(connectionStateListener);
+            executorService.shutdownNow();
+        }
     }
 
     /**
@@ -623,17 +620,17 @@ public class PathChildrenCache implements Closeable
     private void processChildren(List<String> children, RefreshMode mode) throws Exception
     {
         List<String> fullPaths = Lists.newArrayList(Lists.transform
-        (
-            children,
-            new Function<String, String>()
-            {
-                @Override
-                public String apply(String child)
+            (
+                children,
+                new Function<String, String>()
                 {
-                    return ZKPaths.makePath(path, child);
+                    @Override
+                    public String apply(String child)
+                    {
+                        return ZKPaths.makePath(path, child);
+                    }
                 }
-            }
-        ));
+            ));
         Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
         removedNodes.removeAll(fullPaths);
 
@@ -714,43 +711,64 @@ public class PathChildrenCache implements Closeable
         }
 
         Map<String, ChildData> uninitializedChildren = Maps.filterValues
-        (
-            localInitialSet,
-            new Predicate<ChildData>()
-            {
-                @Override
-                public boolean apply(ChildData input)
+            (
+                localInitialSet,
+                new Predicate<ChildData>()
                 {
-                    return (input == NULL_CHILD_DATA);  // check against ref intentional
+                    @Override
+                    public boolean apply(ChildData input)
+                    {
+                        return (input == NULL_CHILD_DATA);  // check against ref intentional
+                    }
                 }
-            }
-        );
+            );
         return (uninitializedChildren.size() != 0);
     }
 
-    private void mainLoop()
+    private void offerOperation(final Operation operation)
     {
-        while ( !Thread.currentThread().isInterrupted() )
+        if ( operationsQuantizer.add(operation) )
         {
-            try
-            {
-                operations.take().invoke();
-            }
-            catch ( InterruptedException e )
-            {
-                Thread.currentThread().interrupt();
-                break;
-            }
-            catch ( Exception e )
-            {
-                handleException(e);
-            }
+            submitToExecutor
+            (
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            operationsQuantizer.remove(operation);
+                            operation.invoke();
+                        }
+                        catch ( Exception e )
+                        {
+                            handleException(e);
+                        }
+                    }
+                }
+            );
         }
     }
 
-    private void offerOperation(Operation operation)
+    /**
+     * Submits a runnable to the executor.
+     * <p/>
+     * This method is synchronized because it has to check state about whether this instance is still open.  Without this check
+     * there is a race condition with the dataWatchers that get set.  Even after this object is closed() it can still be
+     * called by those watchers, because the close() method cannot actually disable the watcher.
+     * <p/>
+     * The synchronization overhead should be minimal if non-existant as this is generally only called from the
+     * ZK client thread and will only contend if close() is called in parallel with an update, and that's the exact state
+     * we want to protect from.
+     *
+     * @param command The runnable to run
+     */
+    private synchronized void submitToExecutor(final Runnable command)
     {
-        operations.remove(operation);   // avoids herding for refresh operations
-        operations.offer(operation);
+        if ( state.get() == State.STARTED )
+        {
+            executorService.execute(command);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/75b54041/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java
index e85d509..f1c10f8 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java
@@ -51,16 +51,37 @@ public class PathChildrenCacheEvent
 
         /**
          * Called when the connection has changed to {@link ConnectionState#SUSPENDED}
+         *
+         * This is exposed so that users of the class can be notified of issues that *might* affect normal operation.
+         * The PathChildrenCache is written such that listeners are not expected to do anything special on this
+         * event, except for those people who want to cause some application-specific logic to fire when this occurs.
+         * While the connection is down, the PathChildrenCache will continue to have its state from before it lost
+         * the connection and after the connection is restored, the PathChildrenCache will emit normal child events
+         * for all of the adds, deletes and updates that happened during the time that it was disconnected.
          */
         CONNECTION_SUSPENDED,
 
         /**
          * Called when the connection has changed to {@link ConnectionState#RECONNECTED}
+         *
+         * This is exposed so that users of the class can be notified of issues that *might* affect normal operation.
+         * The PathChildrenCache is written such that listeners are not expected to do anything special on this
+         * event, except for those people who want to cause some application-specific logic to fire when this occurs.
+         * While the connection is down, the PathChildrenCache will continue to have its state from before it lost
+         * the connection and after the connection is restored, the PathChildrenCache will emit normal child events
+         * for all of the adds, deletes and updates that happened during the time that it was disconnected.
          */
         CONNECTION_RECONNECTED,
 
         /**
          * Called when the connection has changed to {@link ConnectionState#LOST}
+         *
+         * This is exposed so that users of the class can be notified of issues that *might* affect normal operation.
+         * The PathChildrenCache is written such that listeners are not expected to do anything special on this
+         * event, except for those people who want to cause some application-specific logic to fire when this occurs.
+         * While the connection is down, the PathChildrenCache will continue to have its state from before it lost
+         * the connection and after the connection is restored, the PathChildrenCache will emit normal child events
+         * for all of the adds, deletes and updates that happened during the time that it was disconnected.
          */
         CONNECTION_LOST,
 

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/75b54041/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index e51125b..7fed5f8 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.recipes.cache;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
 import org.apache.curator.framework.CuratorFramework;
@@ -31,6 +32,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -728,4 +730,253 @@ public class TestPathChildrenCache extends BaseClassForTests
             client.close();
         }
     }
+
+    @Test
+    public void testBasicsOnTwoCachesWithSameExecutor() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+            final ExecutorService exec = new ShutdownNowIgnoringExecutorService(Executors.newSingleThreadExecutor());
+            PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec);
+            cache.getListenable().addListener
+                (
+                    new PathChildrenCacheListener()
+                    {
+                        @Override
+                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+                        {
+                            if ( event.getData().getPath().equals("/test/one") )
+                            {
+                                events.offer(event.getType());
+                            }
+                        }
+                    }
+                );
+            cache.start();
+
+            final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+            PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec);
+            cache2.getListenable().addListener(
+                    new PathChildrenCacheListener() {
+                        @Override
+                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                                throws Exception
+                        {
+                            if ( event.getData().getPath().equals("/test/one") )
+                            {
+                                events2.offer(event.getType());
+                            }
+                        }
+                    }
+            );
+            cache2.start();
+
+            client.create().forPath("/test/one", "hey there".getBytes());
+            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+            client.setData().forPath("/test/one", "sup!".getBytes());
+            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+            Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
+            Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
+
+            client.delete().forPath("/test/one");
+            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+
+            cache.close();
+            cache2.close();
+        }
+        finally
+        {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testDeleteNodeAfterCloseDoesntCallExecutor()
+            throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
+            PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec);
+
+            cache.start();
+            client.create().forPath("/test/one", "hey there".getBytes());
+
+            cache.rebuild();
+            Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
+            Assert.assertTrue(exec.isExecuteCalled());
+
+            exec.setExecuteCalled(false);
+            cache.close();
+            Assert.assertFalse(exec.isExecuteCalled());
+
+            client.delete().forPath("/test/one");
+            Thread.sleep(100);
+            Assert.assertFalse(exec.isExecuteCalled());
+        }
+        finally {
+            client.close();
+        }
+
+    }
+
+    public static class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService
+    {
+        boolean executeCalled = false;
+
+        public ExecuteCalledWatchingExecutorService(ExecutorService delegate)
+        {
+            super(delegate);
+        }
+
+        @Override
+        public synchronized void execute(Runnable command)
+        {
+            executeCalled = true;
+            super.execute(command);
+        }
+
+        public synchronized boolean isExecuteCalled()
+        {
+            return executeCalled;
+        }
+
+        public synchronized void setExecuteCalled(boolean executeCalled)
+        {
+            this.executeCalled = executeCalled;
+        }
+    }
+
+    /**
+     * This is required to work around https://issues.apache.org/jira/browse/CURATOR-17
+     */
+    public static class ShutdownNowIgnoringExecutorService extends DelegatingExecutorService
+    {
+        public ShutdownNowIgnoringExecutorService(ExecutorService delegate)
+        {
+            super(delegate);
+        }
+
+        @Override
+        public void shutdown()
+        {
+            // ignore
+        }
+
+        @Override
+        public List<Runnable> shutdownNow()
+        {
+            // ignore
+            return ImmutableList.of();
+        }
+    }
+
+    public static class DelegatingExecutorService implements ExecutorService
+    {
+        private final ExecutorService delegate;
+
+        public DelegatingExecutorService(
+                ExecutorService delegate
+        )
+        {
+            this.delegate = delegate;
+        }
+
+
+        @Override
+        public void shutdown()
+        {
+            delegate.shutdown();
+        }
+
+        @Override
+        public List<Runnable> shutdownNow()
+        {
+            return delegate.shutdownNow();
+        }
+
+        @Override
+        public boolean isShutdown()
+        {
+            return delegate.isShutdown();
+        }
+
+        @Override
+        public boolean isTerminated()
+        {
+            return delegate.isTerminated();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit)
+                throws InterruptedException
+        {
+            return delegate.awaitTermination(timeout, unit);
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task)
+        {
+            return delegate.submit(task);
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result)
+        {
+            return delegate.submit(task, result);
+        }
+
+        @Override
+        public Future<?> submit(Runnable task)
+        {
+            return delegate.submit(task);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+                throws InterruptedException
+        {
+            return delegate.invokeAll(tasks);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                throws InterruptedException
+        {
+            return delegate.invokeAll(tasks, timeout, unit);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+                throws InterruptedException, ExecutionException
+        {
+            return delegate.invokeAny(tasks);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException, TimeoutException
+        {
+            return delegate.invokeAny(tasks, timeout, unit);
+        }
+
+        @Override
+        public void execute(Runnable command)
+        {
+            delegate.execute(command);
+        }
+    }
 }