You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/05/17 22:39:11 UTC

[01/12] git commit: CURATOR-21 PathChildrenCache currently takes an Executor, but only to fire off a Runnable that does a blocking while loop waiting for work. This means that you must have one thread per PathChildrenCache, which is not that great.

Updated Branches:
  refs/heads/master 9eff1cf0e -> 6ac0133ce


CURATOR-21
PathChildrenCache currently takes an Executor, but only to fire off a Runnable that does a blocking while loop waiting for work. This means that you must have one thread per PathChildrenCache, which is not that great.

PathChildrenCache should just use the Executor's work queuing mechanism to enqueue work items instead of maintaining its own work queue mechanism.


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/75b54041
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/75b54041
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/75b54041

Branch: refs/heads/master
Commit: 75b540414f860446f106a1cdbdea121046be2e98
Parents: 97cda39
Author: randgalt <ra...@apache.org>
Authored: Sun May 5 07:20:02 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Sun May 5 07:20:02 2013 -0700

----------------------------------------------------------------------
 .../apache/curator/framework/CuratorFramework.java |    3 +
 .../framework/recipes/cache/PathChildrenCache.java |  132 +++++----
 .../recipes/cache/PathChildrenCacheEvent.java      |   21 ++
 .../recipes/cache/TestPathChildrenCache.java       |  251 +++++++++++++++
 4 files changed, 350 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/75b54041/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 7938b2e..0ca9b23 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -74,6 +74,9 @@ public interface CuratorFramework extends Closeable
     /**
      * Start an exists builder
      *
+     * The builder will return a Stat object as if org.apache.zookeeper.ZooKeeper.exists() were called.  Thus, a null
+     * means that it does not exist and an actual Stat object means it does exist.
+     *
      * @return builder object
      */
     public ExistsBuilder checkExists();

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/75b54041/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 9b25001..76efc6c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -46,12 +47,10 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Exchanger;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -74,12 +73,20 @@ public class PathChildrenCache implements Closeable
     private final boolean cacheData;
     private final boolean dataIsCompressed;
     private final EnsurePath ensurePath;
-    private final BlockingQueue<Operation> operations = new LinkedBlockingQueue<Operation>();
     private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
     private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();
     private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
+    private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
 
-    private static final ChildData      NULL_CHILD_DATA = new ChildData(null, null, null);
+    private static final ChildData NULL_CHILD_DATA = new ChildData(null, null, null);
 
     private final Watcher childrenWatcher = new Watcher()
     {
@@ -217,8 +224,8 @@ public class PathChildrenCache implements Closeable
      * @param buildInitial if true, {@link #rebuild()} will be called before this method
      *                     returns in order to get an initial view of the node; otherwise,
      *                     the cache will be initialized asynchronously
-     * @deprecated use {@link #start(StartMode)}
      * @throws Exception errors
+     * @deprecated use {@link #start(StartMode)}
      */
     public void start(boolean buildInitial) throws Exception
     {
@@ -257,21 +264,10 @@ public class PathChildrenCache implements Closeable
      */
     public void start(StartMode mode) throws Exception
     {
-        Preconditions.checkState(!executorService.isShutdown(), "already started");
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
         mode = Preconditions.checkNotNull(mode, "mode cannot be null");
 
         client.getConnectionStateListenable().addListener(connectionStateListener);
-        executorService.execute
-            (
-                new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        mainLoop();
-                    }
-                }
-            );
 
         switch ( mode )
         {
@@ -354,10 +350,11 @@ public class PathChildrenCache implements Closeable
     @Override
     public void close() throws IOException
     {
-        Preconditions.checkState(!executorService.isShutdown(), "has not been started");
-
-        client.getConnectionStateListenable().removeListener(connectionStateListener);
-        executorService.shutdownNow();
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            client.getConnectionStateListenable().removeListener(connectionStateListener);
+            executorService.shutdownNow();
+        }
     }
 
     /**
@@ -623,17 +620,17 @@ public class PathChildrenCache implements Closeable
     private void processChildren(List<String> children, RefreshMode mode) throws Exception
     {
         List<String> fullPaths = Lists.newArrayList(Lists.transform
-        (
-            children,
-            new Function<String, String>()
-            {
-                @Override
-                public String apply(String child)
+            (
+                children,
+                new Function<String, String>()
                 {
-                    return ZKPaths.makePath(path, child);
+                    @Override
+                    public String apply(String child)
+                    {
+                        return ZKPaths.makePath(path, child);
+                    }
                 }
-            }
-        ));
+            ));
         Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
         removedNodes.removeAll(fullPaths);
 
@@ -714,43 +711,64 @@ public class PathChildrenCache implements Closeable
         }
 
         Map<String, ChildData> uninitializedChildren = Maps.filterValues
-        (
-            localInitialSet,
-            new Predicate<ChildData>()
-            {
-                @Override
-                public boolean apply(ChildData input)
+            (
+                localInitialSet,
+                new Predicate<ChildData>()
                 {
-                    return (input == NULL_CHILD_DATA);  // check against ref intentional
+                    @Override
+                    public boolean apply(ChildData input)
+                    {
+                        return (input == NULL_CHILD_DATA);  // check against ref intentional
+                    }
                 }
-            }
-        );
+            );
         return (uninitializedChildren.size() != 0);
     }
 
-    private void mainLoop()
+    private void offerOperation(final Operation operation)
     {
-        while ( !Thread.currentThread().isInterrupted() )
+        if ( operationsQuantizer.add(operation) )
         {
-            try
-            {
-                operations.take().invoke();
-            }
-            catch ( InterruptedException e )
-            {
-                Thread.currentThread().interrupt();
-                break;
-            }
-            catch ( Exception e )
-            {
-                handleException(e);
-            }
+            submitToExecutor
+            (
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            operationsQuantizer.remove(operation);
+                            operation.invoke();
+                        }
+                        catch ( Exception e )
+                        {
+                            handleException(e);
+                        }
+                    }
+                }
+            );
         }
     }
 
-    private void offerOperation(Operation operation)
+    /**
+     * Submits a runnable to the executor.
+     * <p/>
+     * This method is synchronized because it has to check state about whether this instance is still open.  Without this check
+     * there is a race condition with the dataWatchers that get set.  Even after this object is closed() it can still be
+     * called by those watchers, because the close() method cannot actually disable the watcher.
+     * <p/>
+     * The synchronization overhead should be minimal if non-existant as this is generally only called from the
+     * ZK client thread and will only contend if close() is called in parallel with an update, and that's the exact state
+     * we want to protect from.
+     *
+     * @param command The runnable to run
+     */
+    private synchronized void submitToExecutor(final Runnable command)
     {
-        operations.remove(operation);   // avoids herding for refresh operations
-        operations.offer(operation);
+        if ( state.get() == State.STARTED )
+        {
+            executorService.execute(command);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/75b54041/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java
index e85d509..f1c10f8 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java
@@ -51,16 +51,37 @@ public class PathChildrenCacheEvent
 
         /**
          * Called when the connection has changed to {@link ConnectionState#SUSPENDED}
+         *
+         * This is exposed so that users of the class can be notified of issues that *might* affect normal operation.
+         * The PathChildrenCache is written such that listeners are not expected to do anything special on this
+         * event, except for those people who want to cause some application-specific logic to fire when this occurs.
+         * While the connection is down, the PathChildrenCache will continue to have its state from before it lost
+         * the connection and after the connection is restored, the PathChildrenCache will emit normal child events
+         * for all of the adds, deletes and updates that happened during the time that it was disconnected.
          */
         CONNECTION_SUSPENDED,
 
         /**
          * Called when the connection has changed to {@link ConnectionState#RECONNECTED}
+         *
+         * This is exposed so that users of the class can be notified of issues that *might* affect normal operation.
+         * The PathChildrenCache is written such that listeners are not expected to do anything special on this
+         * event, except for those people who want to cause some application-specific logic to fire when this occurs.
+         * While the connection is down, the PathChildrenCache will continue to have its state from before it lost
+         * the connection and after the connection is restored, the PathChildrenCache will emit normal child events
+         * for all of the adds, deletes and updates that happened during the time that it was disconnected.
          */
         CONNECTION_RECONNECTED,
 
         /**
          * Called when the connection has changed to {@link ConnectionState#LOST}
+         *
+         * This is exposed so that users of the class can be notified of issues that *might* affect normal operation.
+         * The PathChildrenCache is written such that listeners are not expected to do anything special on this
+         * event, except for those people who want to cause some application-specific logic to fire when this occurs.
+         * While the connection is down, the PathChildrenCache will continue to have its state from before it lost
+         * the connection and after the connection is restored, the PathChildrenCache will emit normal child events
+         * for all of the adds, deletes and updates that happened during the time that it was disconnected.
          */
         CONNECTION_LOST,
 

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/75b54041/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index e51125b..7fed5f8 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.recipes.cache;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
 import org.apache.curator.framework.CuratorFramework;
@@ -31,6 +32,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -728,4 +730,253 @@ public class TestPathChildrenCache extends BaseClassForTests
             client.close();
         }
     }
+
+    @Test
+    public void testBasicsOnTwoCachesWithSameExecutor() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+            final ExecutorService exec = new ShutdownNowIgnoringExecutorService(Executors.newSingleThreadExecutor());
+            PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec);
+            cache.getListenable().addListener
+                (
+                    new PathChildrenCacheListener()
+                    {
+                        @Override
+                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+                        {
+                            if ( event.getData().getPath().equals("/test/one") )
+                            {
+                                events.offer(event.getType());
+                            }
+                        }
+                    }
+                );
+            cache.start();
+
+            final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+            PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec);
+            cache2.getListenable().addListener(
+                    new PathChildrenCacheListener() {
+                        @Override
+                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+                                throws Exception
+                        {
+                            if ( event.getData().getPath().equals("/test/one") )
+                            {
+                                events2.offer(event.getType());
+                            }
+                        }
+                    }
+            );
+            cache2.start();
+
+            client.create().forPath("/test/one", "hey there".getBytes());
+            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+            client.setData().forPath("/test/one", "sup!".getBytes());
+            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+            Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
+            Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
+
+            client.delete().forPath("/test/one");
+            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+
+            cache.close();
+            cache2.close();
+        }
+        finally
+        {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testDeleteNodeAfterCloseDoesntCallExecutor()
+            throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
+            PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec);
+
+            cache.start();
+            client.create().forPath("/test/one", "hey there".getBytes());
+
+            cache.rebuild();
+            Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
+            Assert.assertTrue(exec.isExecuteCalled());
+
+            exec.setExecuteCalled(false);
+            cache.close();
+            Assert.assertFalse(exec.isExecuteCalled());
+
+            client.delete().forPath("/test/one");
+            Thread.sleep(100);
+            Assert.assertFalse(exec.isExecuteCalled());
+        }
+        finally {
+            client.close();
+        }
+
+    }
+
+    public static class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService
+    {
+        boolean executeCalled = false;
+
+        public ExecuteCalledWatchingExecutorService(ExecutorService delegate)
+        {
+            super(delegate);
+        }
+
+        @Override
+        public synchronized void execute(Runnable command)
+        {
+            executeCalled = true;
+            super.execute(command);
+        }
+
+        public synchronized boolean isExecuteCalled()
+        {
+            return executeCalled;
+        }
+
+        public synchronized void setExecuteCalled(boolean executeCalled)
+        {
+            this.executeCalled = executeCalled;
+        }
+    }
+
+    /**
+     * This is required to work around https://issues.apache.org/jira/browse/CURATOR-17
+     */
+    public static class ShutdownNowIgnoringExecutorService extends DelegatingExecutorService
+    {
+        public ShutdownNowIgnoringExecutorService(ExecutorService delegate)
+        {
+            super(delegate);
+        }
+
+        @Override
+        public void shutdown()
+        {
+            // ignore
+        }
+
+        @Override
+        public List<Runnable> shutdownNow()
+        {
+            // ignore
+            return ImmutableList.of();
+        }
+    }
+
+    public static class DelegatingExecutorService implements ExecutorService
+    {
+        private final ExecutorService delegate;
+
+        public DelegatingExecutorService(
+                ExecutorService delegate
+        )
+        {
+            this.delegate = delegate;
+        }
+
+
+        @Override
+        public void shutdown()
+        {
+            delegate.shutdown();
+        }
+
+        @Override
+        public List<Runnable> shutdownNow()
+        {
+            return delegate.shutdownNow();
+        }
+
+        @Override
+        public boolean isShutdown()
+        {
+            return delegate.isShutdown();
+        }
+
+        @Override
+        public boolean isTerminated()
+        {
+            return delegate.isTerminated();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit)
+                throws InterruptedException
+        {
+            return delegate.awaitTermination(timeout, unit);
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task)
+        {
+            return delegate.submit(task);
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result)
+        {
+            return delegate.submit(task, result);
+        }
+
+        @Override
+        public Future<?> submit(Runnable task)
+        {
+            return delegate.submit(task);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+                throws InterruptedException
+        {
+            return delegate.invokeAll(tasks);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                throws InterruptedException
+        {
+            return delegate.invokeAll(tasks, timeout, unit);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+                throws InterruptedException, ExecutionException
+        {
+            return delegate.invokeAny(tasks);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException, TimeoutException
+        {
+            return delegate.invokeAny(tasks, timeout, unit);
+        }
+
+        @Override
+        public void execute(Runnable command)
+        {
+            delegate.execute(command);
+        }
+    }
 }


[06/12] git commit: CURATOR-18 There is a race between creating a node and deleting it. If LeaderLatch is closed while the node creation is in flight, the created node won't get deleted, and latch node will stay in ZK forever.

Posted by ra...@apache.org.
CURATOR-18
There is a race between creating a node and deleting it. If LeaderLatch is closed while the node creation is in flight, the created node won't get deleted, and latch node will stay in ZK forever.


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/38f28b5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/38f28b5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/38f28b5c

Branch: refs/heads/master
Commit: 38f28b5c951fd341c9845a3538d52bb3508536e2
Parents: 3d6181c
Author: randgalt <ra...@apache.org>
Authored: Thu May 9 17:00:43 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Thu May 9 17:00:43 2013 -0700

----------------------------------------------------------------------
 .../framework/recipes/leader/LeaderLatch.java      |    9 ++++-
 .../framework/recipes/leader/TestLeaderLatch.java  |   30 +++++++++++++++
 2 files changed, 38 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/38f28b5c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 508ca7c..dc37386 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -396,7 +396,14 @@ public class LeaderLatch implements Closeable
                 if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                 {
                     setNode(event.getName());
-                    getChildren();
+                    if ( state.get() == State.CLOSED )
+                    {
+                        setNode(null);
+                    }
+                    else
+                    {
+                        getChildren();
+                    }
                 }
                 else
                 {

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/38f28b5c/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 58a61ee..4741e76 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -80,6 +80,36 @@ public class TestLeaderLatch extends BaseClassForTests
     }
 
     @Test
+    public void testCreateDeleteRace() throws Exception
+    {
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
+
+            latch.debugResetWaitLatch = new CountDownLatch(1);
+
+            latch.start();
+            latch.close();
+
+            timing.sleepABit();
+
+            latch.debugResetWaitLatch.countDown();
+
+            timing.sleepABit();
+
+            Assert.assertEquals(client.getChildren().forPath(PATH_NAME).size(), 0);
+
+        }
+        finally
+        {
+            Closeables.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void testLostConnection() throws Exception
     {
         final int PARTICIPANT_QTY = 10;


[05/12] git commit: CURATOR-22 Add a listener to LeaderLatch

Posted by ra...@apache.org.
CURATOR-22
Add a listener to LeaderLatch


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/3d6181ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/3d6181ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/3d6181ca

Branch: refs/heads/master
Commit: 3d6181cae984807bd2f89cecc2e5b55d0574a5b3
Parents: 9acf592
Author: randgalt <ra...@apache.org>
Authored: Thu May 9 16:46:50 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Thu May 9 16:46:50 2013 -0700

----------------------------------------------------------------------
 .../framework/recipes/leader/LeaderLatch.java      |  244 ++++++++++-----
 .../recipes/leader/LeaderLatchListener.java        |   46 +++
 .../framework/recipes/leader/TestLeaderLatch.java  |  181 ++++++++---
 3 files changed, 351 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/3d6181ca/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 36a0636..508ca7c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -16,13 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.leader;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.recipes.locks.LockInternals;
 import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
 import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
@@ -41,29 +44,31 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * <p>
- *     Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected to
- *     a Zookeeper cluster. If a group of N thread/processes contend for leadership one will
- *     randomly be assigned leader until it releases leadership at which time another one from the
- *     group will randomly be chosen
+ * Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected to
+ * a Zookeeper cluster. If a group of N thread/processes contend for leadership one will
+ * randomly be assigned leader until it releases leadership at which time another one from the
+ * group will randomly be chosen
  * </p>
  */
 public class LeaderLatch implements Closeable
 {
-    private final Logger                                log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework                      client;
-    private final String                                latchPath;
-    private final String                                id;
-    private final AtomicReference<State>                state = new AtomicReference<State>(State.LATENT);
-    private final AtomicBoolean                         hasLeadership = new AtomicBoolean(false);
-    private final AtomicReference<String>               ourPath = new AtomicReference<String>();
-
-    private final ConnectionStateListener               listener = new ConnectionStateListener()
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework client;
+    private final String latchPath;
+    private final String id;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
+    private final AtomicReference<String> ourPath = new AtomicReference<String>();
+    private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
+
+    private final ConnectionStateListener listener = new ConnectionStateListener()
     {
         @Override
         public void stateChanged(CuratorFramework client, ConnectionState newState)
@@ -74,7 +79,7 @@ public class LeaderLatch implements Closeable
 
     private static final String LOCK_NAME = "latch-";
 
-    private static final LockInternalsSorter        sorter = new LockInternalsSorter()
+    private static final LockInternalsSorter sorter = new LockInternalsSorter()
     {
         @Override
         public String fixForSorting(String str, String lockName)
@@ -83,7 +88,7 @@ public class LeaderLatch implements Closeable
         }
     };
 
-    private enum State
+    public enum State
     {
         LATENT,
         STARTED,
@@ -91,7 +96,7 @@ public class LeaderLatch implements Closeable
     }
 
     /**
-     * @param client the client
+     * @param client    the client
      * @param latchPath the path for this leadership group
      */
     public LeaderLatch(CuratorFramework client, String latchPath)
@@ -100,9 +105,9 @@ public class LeaderLatch implements Closeable
     }
 
     /**
-     * @param client the client
+     * @param client    the client
      * @param latchPath the path for this leadership group
-     * @param id participant ID
+     * @param id        participant ID
      */
     public LeaderLatch(CuratorFramework client, String latchPath, String id)
     {
@@ -147,16 +152,60 @@ public class LeaderLatch implements Closeable
         finally
         {
             client.getConnectionStateListenable().removeListener(listener);
+            listeners.clear();
             setLeadership(false);
         }
     }
 
     /**
+     * Attaches a listener to this LeaderLatch
+     * <p/>
+     * Attaching the same listener multiple times is a noop from the second time on.
+     * <p/>
+     * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded
+     * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
+     * them being called out of order you are welcome to use multiple threads.
+     *
+     * @param listener the listener to attach
+     */
+    public void addListener(LeaderLatchListener listener)
+    {
+        listeners.addListener(listener);
+    }
+
+    /**
+     * Attaches a listener to this LeaderLatch
+     * <p/>
+     * Attaching the same listener multiple times is a noop from the second time on.
+     * <p/>
+     * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded
+     * executor so that you can be certain that listener methods are called in sequence, but if you are fine with
+     * them being called out of order you are welcome to use multiple threads.
+     *
+     * @param listener the listener to attach
+     * @param executor     An executor to run the methods for the listener on.
+     */
+    public void addListener(LeaderLatchListener listener, Executor executor)
+    {
+        listeners.addListener(listener, executor);
+    }
+
+    /**
+     * Removes a given listener from this LeaderLatch
+     *
+     * @param listener the listener to remove
+     */
+    public void removeListener(LeaderLatchListener listener)
+    {
+        listeners.removeListener(listener);
+    }
+
+    /**
      * <p>Causes the current thread to wait until this instance acquires leadership
      * unless the thread is {@linkplain Thread#interrupt interrupted} or {@linkplain #close() closed}.</p>
-     *
+     * <p/>
      * <p>If this instance already is the leader then this method returns immediately.</p>
-     *
+     * <p/>
      * <p>Otherwise the current
      * thread becomes disabled for thread scheduling purposes and lies
      * dormant until one of three things happen:
@@ -166,7 +215,7 @@ public class LeaderLatch implements Closeable
      * the current thread</li>
      * <li>The instance is {@linkplain #close() closed}</li>
      * </ul></p>
-     *
+     * <p/>
      * <p>If the current thread:
      * <ul>
      * <li>has its interrupted status set on entry to this method; or
@@ -176,9 +225,9 @@ public class LeaderLatch implements Closeable
      * interrupted status is cleared.</p>
      *
      * @throws InterruptedException if the current thread is interrupted
-     *         while waiting
-     * @throws EOFException if the instance is {@linkplain #close() closed}
-     *         while waiting
+     *                              while waiting
+     * @throws EOFException         if the instance is {@linkplain #close() closed}
+     *                              while waiting
      */
     public void await() throws InterruptedException, EOFException
     {
@@ -199,10 +248,10 @@ public class LeaderLatch implements Closeable
      * <p>Causes the current thread to wait until this instance acquires leadership
      * unless the thread is {@linkplain Thread#interrupt interrupted},
      * the specified waiting time elapses or the instance is {@linkplain #close() closed}.</p>
-     *
+     * <p/>
      * <p>If this instance already is the leader then this method returns immediately
      * with the value {@code true}.</p>
-     *
+     * <p/>
      * <p>Otherwise the current
      * thread becomes disabled for thread scheduling purposes and lies
      * dormant until one of four things happen:
@@ -213,7 +262,7 @@ public class LeaderLatch implements Closeable
      * <li>The specified waiting time elapses.</li>
      * <li>The instance is {@linkplain #close() closed}</li>
      * </ul>
-     *
+     * <p/>
      * <p>If the current thread:
      * <ul>
      * <li>has its interrupted status set on entry to this method; or
@@ -221,29 +270,29 @@ public class LeaderLatch implements Closeable
      * </ul>
      * then {@link InterruptedException} is thrown and the current thread's
      * interrupted status is cleared.</p>
-     *
+     * <p/>
      * <p>If the specified waiting time elapses or the instance is {@linkplain #close() closed}
      * then the value {@code false} is returned.  If the time is less than or equal to zero, the method
      * will not wait at all.</p>
      *
      * @param timeout the maximum time to wait
-     * @param unit the time unit of the {@code timeout} argument
+     * @param unit    the time unit of the {@code timeout} argument
      * @return {@code true} if the count reached zero and {@code false}
      *         if the waiting time elapsed before the count reached zero or the instances was closed
      * @throws InterruptedException if the current thread is interrupted
-     *         while waiting
+     *                              while waiting
      */
     public boolean await(long timeout, TimeUnit unit) throws InterruptedException
     {
-        long        waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
+        long waitNanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
 
         synchronized(this)
         {
             while ( (waitNanos > 0) && (state.get() == State.STARTED) && !hasLeadership.get() )
             {
-                long        startNanos = System.nanoTime();
+                long startNanos = System.nanoTime();
                 TimeUnit.NANOSECONDS.timedWait(this, waitNanos);
-                long        elapsed = System.nanoTime() - startNanos;
+                long elapsed = System.nanoTime() - startNanos;
                 waitNanos -= elapsed;
             }
         }
@@ -261,14 +310,27 @@ public class LeaderLatch implements Closeable
     }
 
     /**
+     * Returns this instances current state, this is the only way to verify that the object has been closed before
+     * closing again.  If you try to close a latch multiple times, the close() method will throw an
+     * IllegalArgumentException which is often not caught and ignored (Closeables.closeQuietly() only looks for
+     * IOException).
+     *
+     * @return the state of the current instance
+     */
+    public State getState()
+    {
+        return state.get();
+    }
+
+    /**
      * <p>
-     *     Returns the set of current participants in the leader selection
+     * Returns the set of current participants in the leader selection
      * </p>
-     *
+     * <p/>
      * <p>
-     *     <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
-     *     return a value that does not match {@link #hasLeadership()} as hasLeadership
-     *     uses a local field of the class.
+     * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
+     * return a value that does not match {@link #hasLeadership()} as hasLeadership
+     * uses a local field of the class.
      * </p>
      *
      * @return participants
@@ -282,20 +344,20 @@ public class LeaderLatch implements Closeable
 
     /**
      * <p>
-     *     Return the id for the current leader. If for some reason there is no
-     *     current leader, a dummy participant is returned.
+     * Return the id for the current leader. If for some reason there is no
+     * current leader, a dummy participant is returned.
      * </p>
-     *
+     * <p/>
      * <p>
-     *     <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
-     *     return a value that does not match {@link #hasLeadership()} as hasLeadership
-     *     uses a local field of the class.
+     * <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
+     * return a value that does not match {@link #hasLeadership()} as hasLeadership
+     * uses a local field of the class.
      * </p>
      *
      * @return leader
      * @throws Exception ZK errors, interruptions, etc.
      */
-    public Participant      getLeader() throws Exception
+    public Participant getLeader() throws Exception
     {
         Collection<String> participantNodes = LockInternals.getParticipantNodes(client, latchPath, LOCK_NAME, sorter);
         return LeaderSelector.getLeader(client, participantNodes);
@@ -320,7 +382,7 @@ public class LeaderLatch implements Closeable
         setLeadership(false);
         setNode(null);
 
-        BackgroundCallback          callback = new BackgroundCallback()
+        BackgroundCallback callback = new BackgroundCallback()
         {
             @Override
             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
@@ -347,9 +409,9 @@ public class LeaderLatch implements Closeable
 
     private void checkLeadership(List<String> children) throws Exception
     {
-        final String    localOurPath = ourPath.get();
-        List<String>    sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
-        int             ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
+        final String localOurPath = ourPath.get();
+        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
+        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
         if ( ourIndex < 0 )
         {
             log.error("Can't find our node. Resetting. Index: " + ourIndex);
@@ -361,7 +423,7 @@ public class LeaderLatch implements Closeable
         }
         else
         {
-            String          watchPath = sortedChildren.get(ourIndex - 1);
+            String watchPath = sortedChildren.get(ourIndex - 1);
             Watcher watcher = new Watcher()
             {
                 @Override
@@ -373,7 +435,7 @@ public class LeaderLatch implements Closeable
                         {
                             getChildren();
                         }
-                        catch(Exception ex)
+                        catch ( Exception ex )
                         {
                             log.error("An error occurred checking the leadership.", ex);
                         }
@@ -381,7 +443,7 @@ public class LeaderLatch implements Closeable
                 }
             };
 
-            BackgroundCallback          callback = new BackgroundCallback()
+            BackgroundCallback callback = new BackgroundCallback()
             {
                 @Override
                 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
@@ -399,7 +461,7 @@ public class LeaderLatch implements Closeable
 
     private void getChildren() throws Exception
     {
-        BackgroundCallback          callback = new BackgroundCallback()
+        BackgroundCallback callback = new BackgroundCallback()
         {
             @Override
             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
@@ -417,44 +479,76 @@ public class LeaderLatch implements Closeable
     {
         switch ( newState )
         {
-            default:
-            {
-                // NOP
-                break;
-            }
+        default:
+        {
+            // NOP
+            break;
+        }
 
-            case RECONNECTED:
+        case RECONNECTED:
+        {
+            try
             {
-                try
-                {
-                    reset();
-                }
-                catch ( Exception e )
-                {
-                    log.error("Could not reset leader latch", e);
-                    setLeadership(false);
-                }
-                break;
+                reset();
             }
-
-            case SUSPENDED:
-            case LOST:
+            catch ( Exception e )
             {
+                log.error("Could not reset leader latch", e);
                 setLeadership(false);
-                break;
             }
+            break;
+        }
+
+        case SUSPENDED:
+        case LOST:
+        {
+            setLeadership(false);
+            break;
+        }
         }
     }
 
     private synchronized void setLeadership(boolean newValue)
     {
-        hasLeadership.set(newValue);
+        boolean oldValue = hasLeadership.getAndSet(newValue);
+
+        if ( oldValue && !newValue )
+        { // Lost leadership, was true, now false
+            listeners.forEach
+            (
+                new Function<LeaderLatchListener, Void>()
+                {
+                    @Override
+                    public Void apply(LeaderLatchListener listener)
+                    {
+                        listener.notLeader();
+                        return null;
+                    }
+                }
+            );
+        }
+        else if ( !oldValue && newValue )
+        { // Gained leadership, was false, now true
+            listeners.forEach
+            (
+                new Function<LeaderLatchListener, Void>()
+                {
+                    @Override
+                    public Void apply(LeaderLatchListener input)
+                    {
+                        input.isLeader();
+                        return null;
+                    }
+                }
+            );
+        }
+
         notifyAll();
     }
 
     private void setNode(String newValue) throws Exception
     {
-        String      oldPath = ourPath.getAndSet(newValue);
+        String oldPath = ourPath.getAndSet(newValue);
         if ( oldPath != null )
         {
             client.delete().guaranteed().inBackground().forPath(oldPath);

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/3d6181ca/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java
new file mode 100644
index 0000000..68dd355
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatchListener.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.leader;
+
+/**
+ * A LeaderLatchListener can be used to be notified asynchronously about when the state of the LeaderLatch has changed.
+ *
+ * Note that just because you are in the middle of one of these method calls, it does not necessarily mean that
+ * hasLeadership() is the corresponding true/false value.  It is possible for the state to change behind the scenes
+ * before these methods get called.  The contract is that if that happens, you should see another call to the other
+ * method pretty quickly.
+ */
+public interface LeaderLatchListener
+{
+  /**
+   * This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true.
+   *
+   * Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false.  If
+   * this occurs, you can expect {@link #notLeader()} to also be called.
+   */
+  public void isLeader();
+
+  /**
+   * This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false.
+   *
+   * Note that it is possible that by the time this method call happens, hasLeadership has become true.  If
+   * this occurs, you can expect {@link #isLeader()} to also be called.
+   */
+  public void notLeader();
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/3d6181ca/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 307c383..58a61ee 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -16,10 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.leader;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.BaseClassForTests;
@@ -39,6 +42,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class TestLeaderLatch extends BaseClassForTests
 {
@@ -88,7 +92,7 @@ public class TestLeaderLatch extends BaseClassForTests
         {
             client.start();
 
-            final CountDownLatch        countDownLatch = new CountDownLatch(1);
+            final CountDownLatch countDownLatch = new CountDownLatch(1);
             client.getConnectionStateListenable().addListener
             (
                 new ConnectionStateListener()
@@ -136,47 +140,47 @@ public class TestLeaderLatch extends BaseClassForTests
     @Test
     public void testCorrectWatching() throws Exception
     {
-    	final int PARTICIPANT_QTY = 10;
-    	final int PARTICIPANT_ID = 2;
-    	
-    	List<LeaderLatch> latches = Lists.newArrayList();
+        final int PARTICIPANT_QTY = 10;
+        final int PARTICIPANT_ID = 2;
+
+        List<LeaderLatch> latches = Lists.newArrayList();
 
         final Timing timing = new Timing();
         final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
         try
         {
-             client.start();
-
-             for ( int i = 0; i < PARTICIPANT_QTY; ++i )
-             {
-                 LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
-                 latch.start();
-                 latches.add(latch);
-             }
-
-             waitForALeader(latches, timing);
-             
-             //we need to close a Participant that doesn't be actual leader (first Participant) nor the last
-             latches.get(PARTICIPANT_ID).close();
-             
-             //As the previous algorithm assumed that if the watched node is deleted gets the leadership
-             //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected as leader.
-             Assert.assertTrue(!latches.get(PARTICIPANT_ID-1).hasLeadership());
-	     }
-	     finally
-	     {
-	    	 //removes the already closed participant
-	    	 latches.remove(PARTICIPANT_ID);
-	    	 
-	         for ( LeaderLatch latch : latches )
-	         {
-	             Closeables.closeQuietly(latch);
-	         }
-	         Closeables.closeQuietly(client);
-	     }
+            client.start();
+
+            for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+            {
+                LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
+                latch.start();
+                latches.add(latch);
+            }
+
+            waitForALeader(latches, timing);
+
+            //we need to close a Participant that doesn't be actual leader (first Participant) nor the last
+            latches.get(PARTICIPANT_ID).close();
+
+            //As the previous algorithm assumed that if the watched node is deleted gets the leadership
+            //we need to ensure that the PARTICIPANT_ID-1 is not getting (wrongly) elected as leader.
+            Assert.assertTrue(!latches.get(PARTICIPANT_ID - 1).hasLeadership());
+        }
+        finally
+        {
+            //removes the already closed participant
+            latches.remove(PARTICIPANT_ID);
+
+            for ( LeaderLatch latch : latches )
+            {
+                Closeables.closeQuietly(latch);
+            }
+            Closeables.closeQuietly(client);
+        }
 
     }
-    
+
     @Test
     public void testWaiting() throws Exception
     {
@@ -244,6 +248,93 @@ public class TestLeaderLatch extends BaseClassForTests
         basic(Mode.START_IN_THREADS);
     }
 
+    @Test
+    public void testCallbackSanity() throws Exception
+    {
+        final int PARTICIPANT_QTY = 10;
+        final CountDownLatch timesSquare = new CountDownLatch(PARTICIPANT_QTY);
+        final AtomicLong masterCounter = new AtomicLong(0);
+        final AtomicLong dunceCounter = new AtomicLong(0);
+
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        ExecutorService exec = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("callbackSanity-%s").build());
+
+        List<LeaderLatch> latches = Lists.newArrayList();
+        for ( int i = 0; i < PARTICIPANT_QTY; ++i )
+        {
+            final LeaderLatch latch = new LeaderLatch(client, PATH_NAME);
+            latch.addListener(
+                new LeaderLatchListener()
+                {
+                    boolean beenLeader = false;
+
+                    @Override
+                    public void isLeader()
+                    {
+                        if ( !beenLeader )
+                        {
+                            masterCounter.incrementAndGet();
+                            beenLeader = true;
+                            try
+                            {
+                                latch.reset();
+                            }
+                            catch ( Exception e )
+                            {
+                                throw Throwables.propagate(e);
+                            }
+                        }
+                        else
+                        {
+                            masterCounter.incrementAndGet();
+                            Closeables.closeQuietly(latch);
+                            timesSquare.countDown();
+                        }
+                    }
+
+                    @Override
+                    public void notLeader()
+                    {
+                        dunceCounter.incrementAndGet();
+                    }
+                },
+                exec
+            );
+            latches.add(latch);
+        }
+
+        try
+        {
+            client.start();
+
+            for ( LeaderLatch latch : latches )
+            {
+                latch.start();
+            }
+
+            timesSquare.await();
+
+            Assert.assertEquals(masterCounter.get(), PARTICIPANT_QTY * 2);
+            Assert.assertEquals(dunceCounter.get(), PARTICIPANT_QTY);
+            for ( LeaderLatch latch : latches )
+            {
+                Assert.assertEquals(latch.getState(), LeaderLatch.State.CLOSED);
+            }
+        }
+        finally
+        {
+            for ( LeaderLatch latch : latches )
+            {
+                if ( latch.getState() != LeaderLatch.State.CLOSED )
+                {
+                    Closeables.closeQuietly(latch);
+                }
+            }
+            Closeables.closeQuietly(client);
+        }
+    }
+
     private enum Mode
     {
         START_IMMEDIATELY,
@@ -277,18 +368,18 @@ public class TestLeaderLatch extends BaseClassForTests
                 for ( final LeaderLatch latch : latches )
                 {
                     service.submit
-                    (
-                        new Callable<Object>()
-                        {
-                            @Override
-                            public Object call() throws Exception
+                        (
+                            new Callable<Object>()
                             {
-                                Thread.sleep((int)(100 * Math.random()));
-                                latch.start();
-                                return null;
+                                @Override
+                                public Object call() throws Exception
+                                {
+                                    Thread.sleep((int)(100 * Math.random()));
+                                    latch.start();
+                                    return null;
+                                }
                             }
-                        }
-                    );
+                        );
                 }
                 service.shutdown();
             }


[07/12] git commit: CURATOR-24 Improve the handling of hung ZooKeeper connections

Posted by ra...@apache.org.
CURATOR-24
Improve the handling of hung ZooKeeper connections


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/11ae23ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/11ae23ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/11ae23ad

Branch: refs/heads/master
Commit: 11ae23adc013fa6526ac001bce2f5b2967a1d9b5
Parents: 38f28b5
Author: randgalt <ra...@apache.org>
Authored: Thu May 9 17:18:09 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Thu May 9 17:18:09 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/curator/ConnectionState.java   |  169 +++++++--------
 .../org/apache/curator/CuratorZookeeperClient.java |    9 -
 .../framework/state/ConnectionStateManager.java    |    5 -
 3 files changed, 84 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/11ae23ad/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index 8de3e27..bbb0588 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -38,24 +38,23 @@ import java.util.concurrent.atomic.AtomicReference;
 
 class ConnectionState implements Watcher, Closeable
 {
-    private volatile long connectionStartMs = 0;
-
-    private final Logger                        log = LoggerFactory.getLogger(getClass());
-    private final HandleHolder                  zooKeeper;
-    private final AtomicBoolean                 isConnected = new AtomicBoolean(false);
-    private final AtomicBoolean                 lost = new AtomicBoolean(false);
-    private final EnsembleProvider              ensembleProvider;
-    private final int                           connectionTimeoutMs;
+    private static final int MAX_BACKGROUND_EXCEPTIONS = 10;
+    private static final boolean LOG_EVENTS = Boolean.getBoolean(DebugUtils.PROPERTY_LOG_EVENTS);
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final HandleHolder zooKeeper;
+    private final AtomicBoolean isConnected = new AtomicBoolean(false);
+    private final EnsembleProvider ensembleProvider;
+    private final int sessionTimeoutMs;
+    private final int connectionTimeoutMs;
     private final AtomicReference<TracerDriver> tracer;
-    private final Queue<Exception>              backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
-    private final Queue<Watcher>                parentWatchers = new ConcurrentLinkedQueue<Watcher>();
-
-    private static final int        MAX_BACKGROUND_EXCEPTIONS = 10;
-    private static final boolean    LOG_EVENTS = Boolean.getBoolean(DebugUtils.PROPERTY_LOG_EVENTS);
+    private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>();
+    private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
+    private volatile long connectionStartMs = 0;
 
     ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
     {
         this.ensembleProvider = ensembleProvider;
+        this.sessionTimeoutMs = sessionTimeoutMs;
         this.connectionTimeoutMs = connectionTimeoutMs;
         this.tracer = tracer;
         if ( parentWatcher != null )
@@ -73,12 +72,6 @@ class ConnectionState implements Watcher, Closeable
             throw new SessionFailRetryLoop.SessionFailedException();
         }
 
-        if ( lost.compareAndSet(true, false) )
-        {
-            log.info("resetting after loss");
-            reset();
-        }
-
         Exception exception = backgroundExceptions.poll();
         if ( exception != null )
         {
@@ -90,24 +83,7 @@ class ConnectionState implements Watcher, Closeable
         boolean localIsConnected = isConnected.get();
         if ( !localIsConnected )
         {
-            long        elapsed = System.currentTimeMillis() - connectionStartMs;
-            if ( elapsed >= connectionTimeoutMs )
-            {
-                if ( zooKeeper.hasNewConnectionString() )
-                {
-                    handleNewConnectionString();
-                }
-                else
-                {
-                    KeeperException.ConnectionLossException connectionLossException = new KeeperException.ConnectionLossException();
-                    if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
-                    {
-                        log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException);
-                    }
-                    tracer.get().addCount("connections-timed-out", 1);
-                    throw connectionLossException;
-                }
-            }
+            checkTimeouts();
         }
 
         return zooKeeper.getZooKeeper();
@@ -118,7 +94,7 @@ class ConnectionState implements Watcher, Closeable
         return isConnected.get();
     }
 
-    void        start() throws Exception
+    void start() throws Exception
     {
         log.debug("Starting");
         ensembleProvider.start();
@@ -126,7 +102,7 @@ class ConnectionState implements Watcher, Closeable
     }
 
     @Override
-    public void        close() throws IOException
+    public void close() throws IOException
     {
         log.debug("Closing");
 
@@ -142,27 +118,19 @@ class ConnectionState implements Watcher, Closeable
         finally
         {
             isConnected.set(false);
-            lost.set(false);
         }
     }
 
-    void        addParentWatcher(Watcher watcher)
+    void addParentWatcher(Watcher watcher)
     {
         parentWatchers.offer(watcher);
     }
 
-    void        removeParentWatcher(Watcher watcher)
+    void removeParentWatcher(Watcher watcher)
     {
         parentWatchers.remove(watcher);
     }
 
-    void markLost()
-    {
-        log.info("lost marked");
-
-        lost.set(true);
-    }
-
     @Override
     public void process(WatchedEvent event)
     {
@@ -188,10 +156,6 @@ class ConnectionState implements Watcher, Closeable
         if ( newIsConnected != wasConnected )
         {
             isConnected.set(newIsConnected);
-            if ( newIsConnected )
-            {
-                lost.set(false);
-            }
             connectionStartMs = System.currentTimeMillis();
         }
     }
@@ -201,6 +165,41 @@ class ConnectionState implements Watcher, Closeable
         return ensembleProvider;
     }
 
+    private synchronized void checkTimeouts() throws Exception
+    {
+        int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
+        long elapsed = System.currentTimeMillis() - connectionStartMs;
+        if ( elapsed >= minTimeout )
+        {
+            if ( zooKeeper.hasNewConnectionString() )
+            {
+                handleNewConnectionString();
+            }
+            else
+            {
+                int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs);
+                if ( elapsed > maxTimeout )
+                {
+                    if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
+                    {
+                        log.warn(String.format("Connection attempt unsuccessful after %d (greater than max timeout of %d). Resetting connection and trying again with a new connection.", elapsed, maxTimeout));
+                    }
+                    reset();
+                }
+                else
+                {
+                    KeeperException.ConnectionLossException connectionLossException = new KeeperException.ConnectionLossException();
+                    if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
+                    {
+                        log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException);
+                    }
+                    tracer.get().addCount("connections-timed-out", 1);
+                    throw connectionLossException;
+                }
+            }
+        }
+    }
+
     private synchronized void reset() throws Exception
     {
         log.debug("reset");
@@ -213,44 +212,44 @@ class ConnectionState implements Watcher, Closeable
 
     private boolean checkState(Event.KeeperState state, boolean wasConnected)
     {
-        boolean     isConnected = wasConnected;
-        boolean     checkNewConnectionString = true;
+        boolean isConnected = wasConnected;
+        boolean checkNewConnectionString = true;
         switch ( state )
         {
-            default:
-            case Disconnected:
-            {
-                isConnected = false;
-                break;
-            }
+        default:
+        case Disconnected:
+        {
+            isConnected = false;
+            break;
+        }
 
-            case SyncConnected:
-            case ConnectedReadOnly:
-            {
-                isConnected = true;
-                break;
-            }
+        case SyncConnected:
+        case ConnectedReadOnly:
+        {
+            isConnected = true;
+            break;
+        }
 
-            case AuthFailed:
-            {
-                isConnected = false;
-                log.error("Authentication failed");
-                break;
-            }
+        case AuthFailed:
+        {
+            isConnected = false;
+            log.error("Authentication failed");
+            break;
+        }
 
-            case Expired:
-            {
-                isConnected = false;
-                checkNewConnectionString = false;
-                handleExpiredSession();
-                break;
-            }
+        case Expired:
+        {
+            isConnected = false;
+            checkNewConnectionString = false;
+            handleExpiredSession();
+            break;
+        }
 
-            case SaslAuthenticated:
-            {
-                // NOP
-                break;
-            }
+        case SaslAuthenticated:
+        {
+            // NOP
+            break;
+        }
         }
 
         if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() )

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/11ae23ad/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index be63a9b..f633fe1 100644
--- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -184,15 +184,6 @@ public class CuratorZookeeperClient implements Closeable
     }
 
     /**
-     * Mark the connection as lost. The next time {@link #getZooKeeper()} is called,
-     * a new connection will be created.
-     */
-    public void markLost()
-    {
-        state.markLost();
-    }
-
-    /**
      * Close the client
      */
     public void     close()

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/11ae23ad/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 8f5d9ff..2ed60c1 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -141,11 +141,6 @@ public class ConnectionStateManager implements Closeable
             return;
         }
 
-        if ( newState == ConnectionState.LOST )
-        {
-            client.getZookeeperClient().markLost();
-        }
-
         ConnectionState     previousState = currentState.getAndSet(newState);
         if ( previousState == newState )
         {


[08/12] git commit: Don't throw if future can't be canceled. Just log it

Posted by ra...@apache.org.
Don't throw if future can't be canceled. Just log it


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/601bc4c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/601bc4c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/601bc4c9

Branch: refs/heads/master
Commit: 601bc4c93229bea70bd4d24121901d36394d6cd5
Parents: 10df9fc
Author: randgalt <ra...@apache.org>
Authored: Fri May 10 19:02:21 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Fri May 10 19:02:21 2013 -0700

----------------------------------------------------------------------
 .../curator/utils/CloseableExecutorService.java    |    6 ++++--
 1 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/601bc4c9/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
index 4024d29..74c6d28 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
@@ -4,6 +4,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.util.Iterator;
 import java.util.Set;
@@ -20,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class CloseableExecutorService implements Closeable
 {
+    private final Logger log = LoggerFactory.getLogger(CloseableExecutorService.class);
     private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap());
     private final ExecutorService executorService;
     protected final AtomicBoolean isOpen = new AtomicBoolean(true);
@@ -79,8 +82,7 @@ public class CloseableExecutorService implements Closeable
             iterator.remove();
             if ( !future.cancel(true) )
             {
-                System.err.println("Could not cancel");
-                throw new RuntimeException("Could not cancel");
+                log.warn("Could not cancel " + future);
             }
         }
     }


[02/12] git commit: temp checkin

Posted by ra...@apache.org.
temp checkin


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/86b82ab6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/86b82ab6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/86b82ab6

Branch: refs/heads/master
Commit: 86b82ab61d951429121e2f869bc00ff4cede9407
Parents: 97cda39
Author: randgalt <ra...@apache.org>
Authored: Mon May 6 11:56:19 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Mon May 6 11:56:19 2013 -0700

----------------------------------------------------------------------
 .../curator/utils/CloseableExecutorService.java    |   28 ++
 .../utils/CloseableExecutorServiceBase.java        |  124 +++++++
 .../utils/CloseableScheduledExecutorService.java   |  100 +++++
 .../org/apache/curator/utils/FutureContainer.java  |   91 +++++
 .../utils/TestCloseableExecutorService.java        |  282 +++++++++++++++
 .../framework/recipes/cache/PathChildrenCache.java |   25 +-
 .../curator/framework/recipes/locks/Reaper.java    |   14 +-
 7 files changed, 642 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
new file mode 100644
index 0000000..cf92ef4
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
@@ -0,0 +1,28 @@
+package org.apache.curator.utils;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Decorates an {@link ExecutorService} such that submitted tasks
+ * are recorded and can be closed en masse.
+ */
+public class CloseableExecutorService extends CloseableExecutorServiceBase
+{
+    private final ListeningExecutorService executorService;
+
+    /**
+     * @param executorService the service to decorate
+     */
+    public CloseableExecutorService(ExecutorService executorService)
+    {
+        this.executorService = MoreExecutors.listeningDecorator(executorService);
+    }
+
+    @Override
+    protected ListeningExecutorService getService()
+    {
+        return executorService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
new file mode 100644
index 0000000..92371d7
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
@@ -0,0 +1,124 @@
+package org.apache.curator.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Decorates an {@link ExecutorService} such that submitted tasks
+ * are recorded and can be closed en masse.
+ */
+abstract class CloseableExecutorServiceBase implements Closeable
+{
+    private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap());
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+    protected abstract ListeningExecutorService getService();
+
+    @Override
+    public void close()
+    {
+        isClosed.set(true);
+        Iterator<Future<?>> iterator = futures.iterator();
+        while ( iterator.hasNext() )
+        {
+            Future<?> future = iterator.next();
+            iterator.remove();
+            future.cancel(true);
+        }
+    }
+
+    /**
+     * @see ExecutorService#isShutdown()
+     * @return true/false
+     */
+    public boolean isShutdown()
+    {
+        return getService().isShutdown();
+    }
+
+    /**
+     * @see ExecutorService#isTerminated()
+     * @return true/false
+     */
+    public boolean isTerminated()
+    {
+        return getService().isTerminated();
+    }
+
+    /**
+     * Calls {@link ExecutorService#submit(Callable)}, records
+     * and returns the future
+     *
+     * @param task task to submit
+     * @return the future
+     */
+    public <T> Future<T> submit(Callable<T> task)
+    {
+        return record(getService().submit(task));
+    }
+
+    /**
+     * Calls {@link ExecutorService#submit(Runnable)}, records
+     * and returns the future
+     *
+     * @param task task to submit
+     * @return the future
+     */
+    public Future<?> submit(Runnable task)
+    {
+        return record(getService().submit(task));
+    }
+
+    @VisibleForTesting
+    int size()
+    {
+        return futures.size();
+    }
+
+    protected <T> ScheduledFuture<T> record(final ScheduledFuture<T> future)
+    {
+        if ( isClosed.get() )
+        {
+            future.cancel(true);
+        }
+        else
+        {
+            futures.add(future);
+        }
+        return future;
+    }
+
+    protected <T> Future<T> record(final ListenableFuture<T> future)
+    {
+        Runnable listener = new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                futures.remove(future);
+            }
+        };
+        if ( isClosed.get() )
+        {
+            future.cancel(true);
+        }
+        else
+        {
+            futures.add(future);
+            future.addListener(listener, MoreExecutors.sameThreadExecutor());
+        }
+        return future;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
new file mode 100644
index 0000000..8638ee6
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
@@ -0,0 +1,100 @@
+package org.apache.curator.utils;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Decorates an {@link ExecutorService} such that submitted tasks
+ * are recorded and can be closed en masse.
+ */
+public class CloseableScheduledExecutorService extends CloseableExecutorServiceBase
+{
+    private final ListeningScheduledExecutorService executorService;
+
+    /**
+     * @param executorService the service to decorate
+     */
+    public CloseableScheduledExecutorService(ScheduledExecutorService executorService)
+    {
+        this.executorService = MoreExecutors.listeningDecorator(executorService);
+    }
+
+    @Override
+    protected ListeningExecutorService getService()
+    {
+        return executorService;
+    }
+
+    /**
+     * Calls {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)}, records
+     * and returns the future
+     *
+     * @param command the task to execute
+     * @param delay the time from now to delay execution
+     * @param unit the time unit of the delay parameter
+     * @return a ScheduledFuture representing pending completion of
+     *         the task and whose <tt>get()</tt> method will return
+     *         <tt>null</tt> upon completion
+     */
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+    {
+        return record(executorService.schedule(command, delay, unit));
+    }
+
+    /**
+     * Calls {@link ScheduledExecutorService#schedule(Callable, long, TimeUnit)}, records
+     * and returns the future
+     *
+     * @param callable the task to execute
+     * @param delay the time from now to delay execution
+     * @param unit the time unit of the delay parameter
+     * @return a ScheduledFuture representing pending completion of
+     *         the task and whose <tt>get()</tt> method will return
+     *         <tt>null</tt> upon completion
+     */
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
+    {
+        return record(executorService.schedule(callable, delay, unit));
+    }
+
+    /**
+     * Calls {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}, records
+     * and returns the future
+     *
+     * @param command the task to execute
+     * @param initialDelay the time to delay first execution
+     * @param period the period between successive executions
+     * @param unit the time unit of the initialDelay and period parameters
+     * @return a ScheduledFuture representing pending completion of
+     *         the task, and whose <tt>get()</tt> method will throw an
+     *         exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+    {
+        return record(executorService.scheduleAtFixedRate(command, initialDelay, period, unit));
+    }
+
+    /**
+     * Calls {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}, records
+     * and returns the future
+     *
+     * @param command the task to execute
+     * @param initialDelay the time to delay first execution
+     * @param delay the delay between the termination of one
+     * execution and the commencement of the next
+     * @param unit the time unit of the initialDelay and delay parameters
+     * @return a ScheduledFuture representing pending completion of
+     *         the task, and whose <tt>get()</tt> method will throw an
+     *         exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+    {
+        return record(executorService.scheduleWithFixedDelay(command, initialDelay, delay, unit));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java b/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
new file mode 100644
index 0000000..51fe6a4
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
@@ -0,0 +1,91 @@
+package org.apache.curator.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+
+public class FutureContainer implements Closeable
+{
+    private final List<Future<?>> futures = Lists.newArrayList();
+    private final ExecutorService executorService;
+
+    private class QueueingFuture<T> extends FutureTask<T>
+    {
+        private final RunnableFuture<T> task;
+
+        QueueingFuture(RunnableFuture<T> task)
+        {
+            super(task, null);
+            this.task = task;
+            futures.add(task);
+        }
+
+        protected void done()
+        {
+            futures.remove(task);
+        }
+    }
+
+    public FutureContainer(ExecutorService executorService)
+    {
+        this.executorService = executorService;
+    }
+
+    @VisibleForTesting
+    int size()
+    {
+        return futures.size();
+    }
+
+    @Override
+    public void close()
+    {
+        Iterator<Future<?>> iterator = futures.iterator();
+        while ( iterator.hasNext() )
+        {
+            Future<?> future = iterator.next();
+            iterator.remove();
+            if ( !future.cancel(true) )
+            {
+                System.err.println("Could not cancel");
+                throw new RuntimeException("Could not cancel");
+            }
+        }
+    }
+
+    /**
+     * Submits a value-returning task for execution and returns a Future
+     * representing the pending results of the task.  Upon completion,
+     * this task may be taken or polled.
+     *
+     * @param task the task to submit
+     */
+    public<V> void submit(Callable<V> task)
+    {
+        FutureTask<V> futureTask = new FutureTask<V>(task);
+        executorService.execute(new QueueingFuture<V>(futureTask));
+    }
+
+    /**
+     * Submits a Runnable task for execution and returns a Future
+     * representing that task.  Upon completion, this task may be
+     * taken or polled.
+     *
+     * @param task the task to submit
+     */
+    public void submit(Runnable task)
+    {
+        FutureTask<Void> futureTask = new FutureTask<Void>(task, null);
+        executorService.execute(new QueueingFuture<Void>(futureTask));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
new file mode 100644
index 0000000..2cd2901
--- /dev/null
+++ b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
@@ -0,0 +1,282 @@
+package org.apache.curator.utils;
+
+import com.google.common.collect.Lists;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestCloseableExecutorService
+{
+    private static final int QTY = 10;
+
+    private volatile ExecutorService executorService;
+    private volatile AtomicInteger count;
+
+    @BeforeMethod
+    public void setup()
+    {
+        executorService = Executors.newFixedThreadPool(QTY * 2);
+        count = new AtomicInteger(0);
+    }
+
+    @AfterMethod
+    public void tearDown()
+    {
+        executorService.shutdownNow();
+    }
+
+    @Test
+    public void testBasicRunnable() throws InterruptedException
+    {
+        try
+        {
+            FutureContainer service = new FutureContainer(executorService);
+            CountDownLatch startLatch = new CountDownLatch(QTY);
+            CountDownLatch latch = new CountDownLatch(QTY);
+            for ( int i = 0; i < QTY; ++i )
+            {
+                submitRunnable(service, startLatch, latch);
+            }
+
+            Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+            service.close();
+            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+        }
+        catch ( AssertionError e )
+        {
+            throw e;
+        }
+        catch ( Throwable e )
+        {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testBasicCallable() throws InterruptedException
+    {
+        CloseableExecutorService service = new CloseableExecutorService(executorService);
+        List<CountDownLatch> latches = Lists.newArrayList();
+        for ( int i = 0; i < QTY; ++i )
+        {
+            final CountDownLatch latch = new CountDownLatch(1);
+            latches.add(latch);
+            service.submit
+                (
+                    new Callable<Void>()
+                    {
+                        @Override
+                        public Void call() throws Exception
+                        {
+                            try
+                            {
+                                Thread.currentThread().join();
+                            }
+                            catch ( InterruptedException e )
+                            {
+                                Thread.currentThread().interrupt();
+                            }
+                            finally
+                            {
+                                latch.countDown();
+                            }
+                            return null;
+                        }
+                    }
+                );
+        }
+
+        service.close();
+        for ( CountDownLatch latch : latches )
+        {
+            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
+        }
+    }
+
+    @Test
+    public void testListeningRunnable() throws InterruptedException
+    {
+        CloseableExecutorService service = new CloseableExecutorService(executorService);
+        List<Future<?>> futures = Lists.newArrayList();
+        for ( int i = 0; i < QTY; ++i )
+        {
+            Future<?> future = service.submit
+            (
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            Thread.currentThread().join();
+                        }
+                        catch ( InterruptedException e )
+                        {
+                            Thread.currentThread().interrupt();
+                        }
+                    }
+                }
+            );
+            futures.add(future);
+        }
+
+        Thread.sleep(100);
+
+        for ( Future<?> future : futures )
+        {
+            future.cancel(true);
+        }
+
+        Assert.assertEquals(service.size(), 0);
+    }
+
+    @Test
+    public void testListeningCallable() throws InterruptedException
+    {
+        CloseableExecutorService service = new CloseableExecutorService(executorService);
+        List<Future<?>> futures = Lists.newArrayList();
+        for ( int i = 0; i < QTY; ++i )
+        {
+            Future<?> future = service.submit
+            (
+                new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
+                    {
+                        try
+                        {
+                            Thread.currentThread().join();
+                        }
+                        catch ( InterruptedException e )
+                        {
+                            Thread.currentThread().interrupt();
+                        }
+                        return null;
+                    }
+                }
+            );
+            futures.add(future);
+        }
+
+        Thread.sleep(100);
+
+        for ( Future<?> future : futures )
+        {
+            future.cancel(true);
+        }
+
+        Assert.assertEquals(service.size(), 0);
+    }
+
+    @Test
+    public void testPartialRunnable() throws InterruptedException
+    {
+        try
+        {
+            final CountDownLatch outsideLatch = new CountDownLatch(1);
+            executorService.submit
+            (
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            Thread.currentThread().join();
+                        }
+                        catch ( InterruptedException e )
+                        {
+                            Thread.currentThread().interrupt();
+                        }
+                        finally
+                        {
+                            outsideLatch.countDown();
+                        }
+                    }
+                }
+            );
+
+            FutureContainer service = new FutureContainer(executorService);
+            CountDownLatch startLatch = new CountDownLatch(QTY);
+            CountDownLatch latch = new CountDownLatch(QTY);
+            for ( int i = 0; i < QTY; ++i )
+            {
+                submitRunnable(service, startLatch, latch);
+            }
+
+            while ( service.size() < QTY )
+            {
+                Thread.sleep(100);
+            }
+
+            Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+            service.close();
+            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+            Assert.assertEquals(outsideLatch.getCount(), 1);
+        }
+        catch ( AssertionError e )
+        {
+            throw e;
+        }
+        catch ( Throwable e )
+        {
+            e.printStackTrace();
+        }
+        finally
+        {
+            executorService.shutdownNow();
+        }
+    }
+
+    private void submitRunnable(FutureContainer service, final CountDownLatch startLatch, final CountDownLatch latch)
+    {
+        try
+        {
+            service.submit
+            (
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            startLatch.countDown();
+                            count.incrementAndGet();
+                            Thread.sleep(100000);
+                        }
+                        catch ( InterruptedException e )
+                        {
+                            Thread.currentThread().interrupt();
+                        }
+                        catch ( Throwable e )
+                        {
+                            e.printStackTrace();
+                        }
+                        finally
+                        {
+    //                        count.decrementAndGet();
+                            latch.countDown();
+                        }
+                    }
+                }
+            );
+        }
+        catch ( Throwable e )
+        {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 9b25001..f42039c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -32,6 +32,7 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
@@ -70,7 +71,7 @@ public class PathChildrenCache implements Closeable
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFramework client;
     private final String path;
-    private final ExecutorService executorService;
+    private final CloseableExecutorService executorService;
     private final boolean cacheData;
     private final boolean dataIsCompressed;
     private final EnsurePath ensurePath;
@@ -197,7 +198,7 @@ public class PathChildrenCache implements Closeable
         this.path = path;
         this.cacheData = cacheData;
         this.dataIsCompressed = dataIsCompressed;
-        this.executorService = executorService;
+        this.executorService = new CloseableExecutorService(executorService);
         ensurePath = client.newNamespaceAwareEnsurePath(path);
     }
 
@@ -261,17 +262,17 @@ public class PathChildrenCache implements Closeable
         mode = Preconditions.checkNotNull(mode, "mode cannot be null");
 
         client.getConnectionStateListenable().addListener(connectionStateListener);
-        executorService.execute
-            (
-                new Runnable()
+        executorService.submit
+        (
+            new Runnable()
+            {
+                @Override
+                public void run()
                 {
-                    @Override
-                    public void run()
-                    {
-                        mainLoop();
-                    }
+                    mainLoop();
                 }
-            );
+            }
+        );
 
         switch ( mode )
         {
@@ -357,7 +358,7 @@ public class PathChildrenCache implements Closeable
         Preconditions.checkState(!executorService.isShutdown(), "has not been started");
 
         client.getConnectionStateListenable().removeListener(connectionStateListener);
-        executorService.shutdownNow();
+        executorService.close();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
index 11efefd..b540689 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.CloseableScheduledExecutorService;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -42,7 +43,7 @@ public class Reaper implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFramework client;
-    private final ScheduledExecutorService executor;
+    private final CloseableScheduledExecutorService executor;
     private final int reapingThresholdMs;
     private final Set<String> activePaths = Sets.newSetFromMap(Maps.<String, Boolean>newConcurrentMap());
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
@@ -127,7 +128,7 @@ public class Reaper implements Closeable
     public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs)
     {
         this.client = client;
-        this.executor = executor;
+        this.executor = new CloseableScheduledExecutorService(executor);
         this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD;
     }
 
@@ -181,14 +182,7 @@ public class Reaper implements Closeable
     {
         if ( state.compareAndSet(State.STARTED, State.CLOSED) )
         {
-            try
-            {
-                executor.shutdownNow();
-            }
-            catch ( Exception e )
-            {
-                log.error("Canceling task", e);
-            }
+            executor.close();
         }
     }
 


[10/12] git commit: Added license headers

Posted by ra...@apache.org.
Added license headers


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/70fcae0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/70fcae0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/70fcae0b

Branch: refs/heads/master
Commit: 70fcae0b075fbaa30639403a4d9e2dfec6d773c6
Parents: 6e3c9e2
Author: randgalt <ra...@apache.org>
Authored: Fri May 10 23:54:59 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Fri May 10 23:54:59 2013 -0700

----------------------------------------------------------------------
 .../curator/utils/CloseableExecutorService.java    |   19 +++++++++++++++
 .../utils/CloseableScheduledExecutorService.java   |   19 +++++++++++++++
 .../utils/TestCloseableExecutorService.java        |   19 +++++++++++++++
 3 files changed, 57 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/70fcae0b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
index 74c6d28..8ff7b32 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.curator.utils;
 
 import com.google.common.annotations.VisibleForTesting;

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/70fcae0b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
index 737ff6b..c2bea63 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.curator.utils;
 
 import com.google.common.base.Preconditions;

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/70fcae0b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
index 72b63fd..cb71116 100644
--- a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
+++ b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.curator.utils;
 
 import com.google.common.collect.Lists;


[12/12] git commit: [maven-release-plugin] prepare release apache-curator-2.0.1-incubating

Posted by ra...@apache.org.
[maven-release-plugin] prepare release apache-curator-2.0.1-incubating


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/6ac0133c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/6ac0133c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/6ac0133c

Branch: refs/heads/master
Commit: 6ac0133ceeedd4dfdbc6cc1c35abc303bfd056d6
Parents: 39ca056
Author: randgalt <ra...@apache.org>
Authored: Fri May 17 13:39:07 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Fri May 17 13:39:07 2013 -0700

----------------------------------------------------------------------
 curator-client/pom.xml             |    4 ++--
 curator-examples/pom.xml           |    4 ++--
 curator-framework/pom.xml          |    4 ++--
 curator-recipes/pom.xml            |    4 ++--
 curator-test/pom.xml               |    4 ++--
 curator-x-discovery-server/pom.xml |    4 ++--
 curator-x-discovery/pom.xml        |    4 ++--
 pom.xml                            |   16 ++++++++--------
 8 files changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6ac0133c/curator-client/pom.xml
----------------------------------------------------------------------
diff --git a/curator-client/pom.xml b/curator-client/pom.xml
index 97b4334..aa1001b 100644
--- a/curator-client/pom.xml
+++ b/curator-client/pom.xml
@@ -24,12 +24,12 @@
     <parent>
         <groupId>org.apache.curator</groupId>
         <artifactId>apache-curator</artifactId>
-        <version>2.0.1-incubating-SNAPSHOT</version>
+        <version>2.0.1-incubating</version>
     </parent>
 
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-client</artifactId>
-    <version>2.0.1-incubating-SNAPSHOT</version>
+    <version>2.0.1-incubating</version>
 
     <name>Curator Client</name>
     <description>Low-level API</description>

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6ac0133c/curator-examples/pom.xml
----------------------------------------------------------------------
diff --git a/curator-examples/pom.xml b/curator-examples/pom.xml
index fd9ff8a..858c7da 100644
--- a/curator-examples/pom.xml
+++ b/curator-examples/pom.xml
@@ -24,12 +24,12 @@
     <parent>
         <groupId>org.apache.curator</groupId>
         <artifactId>apache-curator</artifactId>
-        <version>2.0.1-incubating-SNAPSHOT</version>
+        <version>2.0.1-incubating</version>
     </parent>
 
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-examples</artifactId>
-    <version>2.0.1-incubating-SNAPSHOT</version>
+    <version>2.0.1-incubating</version>
 
     <name>Curator Examples</name>
     <description>Example usages of various Curator features.</description>

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6ac0133c/curator-framework/pom.xml
----------------------------------------------------------------------
diff --git a/curator-framework/pom.xml b/curator-framework/pom.xml
index 6c3bd1d..e4ecff7 100644
--- a/curator-framework/pom.xml
+++ b/curator-framework/pom.xml
@@ -24,12 +24,12 @@
     <parent>
         <groupId>org.apache.curator</groupId>
         <artifactId>apache-curator</artifactId>
-        <version>2.0.1-incubating-SNAPSHOT</version>
+        <version>2.0.1-incubating</version>
     </parent>
 
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-framework</artifactId>
-    <version>2.0.1-incubating-SNAPSHOT</version>
+    <version>2.0.1-incubating</version>
 
     <name>Curator Framework</name>
     <description>High-level API that greatly simplifies using ZooKeeper.</description>

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6ac0133c/curator-recipes/pom.xml
----------------------------------------------------------------------
diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml
index b9890a0..d0ed9f7 100644
--- a/curator-recipes/pom.xml
+++ b/curator-recipes/pom.xml
@@ -24,12 +24,12 @@
     <parent>
         <groupId>org.apache.curator</groupId>
         <artifactId>apache-curator</artifactId>
-        <version>2.0.1-incubating-SNAPSHOT</version>
+        <version>2.0.1-incubating</version>
     </parent>
 
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-recipes</artifactId>
-    <version>2.0.1-incubating-SNAPSHOT</version>
+    <version>2.0.1-incubating</version>
 
     <name>Curator Recipes</name>
     <description>All of the recipes listed on the ZooKeeper recipes doc (except two phase commit).</description>

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6ac0133c/curator-test/pom.xml
----------------------------------------------------------------------
diff --git a/curator-test/pom.xml b/curator-test/pom.xml
index 1a749c5..afa5857 100644
--- a/curator-test/pom.xml
+++ b/curator-test/pom.xml
@@ -24,12 +24,12 @@
     <parent>
         <groupId>org.apache.curator</groupId>
         <artifactId>apache-curator</artifactId>
-        <version>2.0.1-incubating-SNAPSHOT</version>
+        <version>2.0.1-incubating</version>
     </parent>
 
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-test</artifactId>
-    <version>2.0.1-incubating-SNAPSHOT</version>
+    <version>2.0.1-incubating</version>
 
     <name>Curator Testing</name>
     <description>Unit testing utilities.</description>

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6ac0133c/curator-x-discovery-server/pom.xml
----------------------------------------------------------------------
diff --git a/curator-x-discovery-server/pom.xml b/curator-x-discovery-server/pom.xml
index 5f177f1..24a3c8a 100644
--- a/curator-x-discovery-server/pom.xml
+++ b/curator-x-discovery-server/pom.xml
@@ -24,12 +24,12 @@
     <parent>
         <groupId>org.apache.curator</groupId>
         <artifactId>apache-curator</artifactId>
-        <version>2.0.1-incubating-SNAPSHOT</version>
+        <version>2.0.1-incubating</version>
     </parent>
 
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-x-discovery-server</artifactId>
-    <version>2.0.1-incubating-SNAPSHOT</version>
+    <version>2.0.1-incubating</version>
 
     <name>Curator Service Discovery Server</name>
     <description>Bridges non-Java or legacy applications with the Curator Service Discovery.</description>

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6ac0133c/curator-x-discovery/pom.xml
----------------------------------------------------------------------
diff --git a/curator-x-discovery/pom.xml b/curator-x-discovery/pom.xml
index d52aa86..c6ca970 100644
--- a/curator-x-discovery/pom.xml
+++ b/curator-x-discovery/pom.xml
@@ -24,12 +24,12 @@
     <parent>
         <groupId>org.apache.curator</groupId>
         <artifactId>apache-curator</artifactId>
-        <version>2.0.1-incubating-SNAPSHOT</version>
+        <version>2.0.1-incubating</version>
     </parent>
 
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-x-discovery</artifactId>
-    <version>2.0.1-incubating-SNAPSHOT</version>
+    <version>2.0.1-incubating</version>
 
     <name>Curator Service Discovery</name>
     <description>A service discovery recipe.</description>

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6ac0133c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fc0efa6..358952f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
 
     <groupId>org.apache.curator</groupId>
     <artifactId>apache-curator</artifactId>
-    <version>2.0.1-incubating-SNAPSHOT</version>
+    <version>2.0.1-incubating</version>
     <packaging>pom</packaging>
 
     <name>Apache Curator</name>
@@ -62,7 +62,7 @@
         <url>https://git-wip-us.apache.org/repos/asf?p=incubator-curator.git</url>
         <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-curator.git</connection>
         <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-curator.git</developerConnection>
-        <tag>HEAD</tag>
+        <tag>apache-curator-2.0.1-incubating</tag>
     </scm>
 
     <issueManagement>
@@ -218,37 +218,37 @@
             <dependency>
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-client</artifactId>
-                <version>2.0.1-incubating-SNAPSHOT</version>
+                <version>2.0.1-incubating</version>
             </dependency>
 
             <dependency>
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-framework</artifactId>
-                <version>2.0.1-incubating-SNAPSHOT</version>
+                <version>2.0.1-incubating</version>
             </dependency>
 
             <dependency>
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-recipes</artifactId>
-                <version>2.0.1-incubating-SNAPSHOT</version>
+                <version>2.0.1-incubating</version>
             </dependency>
 
             <dependency>
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-test</artifactId>
-                <version>2.0.1-incubating-SNAPSHOT</version>
+                <version>2.0.1-incubating</version>
             </dependency>
 
             <dependency>
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-x-discovery</artifactId>
-                <version>2.0.1-incubating-SNAPSHOT</version>
+                <version>2.0.1-incubating</version>
             </dependency>
 
             <dependency>
                 <groupId>org.apache.curator</groupId>
                 <artifactId>curator-x-discovery-server</artifactId>
-                <version>2.0.1-incubating-SNAPSHOT</version>
+                <version>2.0.1-incubating</version>
             </dependency>
         </dependencies>
     </dependencyManagement>


[09/12] git commit: Introduced CloseableExecutorService. Instead of blindly shutting down exectors, this container shuts down any futures created by an executor. This resolves issues where custom executors are given to Curator.

Posted by ra...@apache.org.
Introduced CloseableExecutorService. Instead of blindly shutting down exectors, this container
shuts down any futures created by an executor. This resolves issues where custom executors
are given to Curator.

Merge branch 'CURATOR-17' into 2.0.1-incubating

Conflicts:
	curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/6e3c9e27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/6e3c9e27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/6e3c9e27

Branch: refs/heads/master
Commit: 6e3c9e27b8e6e2cda6ad146b8a63a60346badb0d
Parents: 11ae23a 601bc4c
Author: randgalt <ra...@apache.org>
Authored: Fri May 10 19:11:23 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Fri May 10 19:11:23 2013 -0700

----------------------------------------------------------------------
 .../curator/utils/CloseableExecutorService.java    |  123 +++++++
 .../utils/CloseableScheduledExecutorService.java   |   72 ++++
 .../utils/TestCloseableExecutorService.java        |  252 +++++++++++++++
 .../framework/recipes/cache/PathChildrenCache.java |    9 +-
 .../framework/recipes/locks/ChildReaper.java       |    9 +-
 .../curator/framework/recipes/locks/Reaper.java    |   25 +-
 .../recipes/cache/TestPathChildrenCache.java       |   27 +--
 .../framework/recipes/locks/TestReaper.java        |   55 ++--
 8 files changed, 502 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6e3c9e27/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 76efc6c,61c3af7..ec2d328
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@@ -350,11 -355,10 +351,11 @@@ public class PathChildrenCache implemen
      @Override
      public void close() throws IOException
      {
 -        Preconditions.checkState(!executorService.isShutdown(), "has not been started");
 -
 -        client.getConnectionStateListenable().removeListener(connectionStateListener);
 -        executorService.close();
 +        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
 +        {
 +            client.getConnectionStateListenable().removeListener(connectionStateListener);
-             executorService.shutdownNow();
++            executorService.close();
 +        }
      }
  
      /**
@@@ -751,24 -749,9 +752,24 @@@
          }
      }
  
 -    private void offerOperation(Operation operation)
 +    /**
 +     * Submits a runnable to the executor.
 +     * <p/>
 +     * This method is synchronized because it has to check state about whether this instance is still open.  Without this check
 +     * there is a race condition with the dataWatchers that get set.  Even after this object is closed() it can still be
 +     * called by those watchers, because the close() method cannot actually disable the watcher.
 +     * <p/>
 +     * The synchronization overhead should be minimal if non-existant as this is generally only called from the
 +     * ZK client thread and will only contend if close() is called in parallel with an update, and that's the exact state
 +     * we want to protect from.
 +     *
 +     * @param command The runnable to run
 +     */
 +    private synchronized void submitToExecutor(final Runnable command)
      {
 -        operations.remove(operation);   // avoids herding for refresh operations
 -        operations.offer(operation);
 +        if ( state.get() == State.STARTED )
 +        {
-             executorService.execute(command);
++            executorService.submit(command);
 +        }
      }
  }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6e3c9e27/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
----------------------------------------------------------------------
diff --cc curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 7fed5f8,e51125b..4b117fb
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@@ -730,253 -728,4 +729,229 @@@ public class TestPathChildrenCache exte
              client.close();
          }
      }
 +
 +    @Test
 +    public void testBasicsOnTwoCachesWithSameExecutor() throws Exception
 +    {
 +        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
 +        client.start();
 +        try
 +        {
 +            client.create().forPath("/test");
 +
 +            final BlockingQueue<PathChildrenCacheEvent.Type> events = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
-             final ExecutorService exec = new ShutdownNowIgnoringExecutorService(Executors.newSingleThreadExecutor());
++            final ExecutorService exec = Executors.newSingleThreadExecutor();
 +            PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec);
 +            cache.getListenable().addListener
 +                (
 +                    new PathChildrenCacheListener()
 +                    {
 +                        @Override
 +                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
 +                        {
 +                            if ( event.getData().getPath().equals("/test/one") )
 +                            {
 +                                events.offer(event.getType());
 +                            }
 +                        }
 +                    }
 +                );
 +            cache.start();
 +
 +            final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
 +            PathChildrenCache cache2 = new PathChildrenCache(client, "/test", true, false, exec);
 +            cache2.getListenable().addListener(
 +                    new PathChildrenCacheListener() {
 +                        @Override
 +                        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
 +                                throws Exception
 +                        {
 +                            if ( event.getData().getPath().equals("/test/one") )
 +                            {
 +                                events2.offer(event.getType());
 +                            }
 +                        }
 +                    }
 +            );
 +            cache2.start();
 +
 +            client.create().forPath("/test/one", "hey there".getBytes());
 +            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
 +            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
 +
 +            client.setData().forPath("/test/one", "sup!".getBytes());
 +            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
 +            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
 +            Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "sup!");
 +            Assert.assertEquals(new String(cache2.getCurrentData("/test/one").getData()), "sup!");
 +
 +            client.delete().forPath("/test/one");
 +            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
 +            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
 +
 +            cache.close();
 +            cache2.close();
 +        }
 +        finally
 +        {
 +            client.close();
 +        }
 +    }
 +
 +    @Test
 +    public void testDeleteNodeAfterCloseDoesntCallExecutor()
 +            throws Exception
 +    {
 +        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
 +        client.start();
 +        try
 +        {
 +            client.create().forPath("/test");
 +
 +            final ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
 +            PathChildrenCache cache = new PathChildrenCache(client, "/test", true, false, exec);
 +
 +            cache.start();
 +            client.create().forPath("/test/one", "hey there".getBytes());
 +
 +            cache.rebuild();
 +            Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
 +            Assert.assertTrue(exec.isExecuteCalled());
 +
 +            exec.setExecuteCalled(false);
 +            cache.close();
 +            Assert.assertFalse(exec.isExecuteCalled());
 +
 +            client.delete().forPath("/test/one");
 +            Thread.sleep(100);
 +            Assert.assertFalse(exec.isExecuteCalled());
 +        }
 +        finally {
 +            client.close();
 +        }
 +
 +    }
 +
 +    public static class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService
 +    {
 +        boolean executeCalled = false;
 +
 +        public ExecuteCalledWatchingExecutorService(ExecutorService delegate)
 +        {
 +            super(delegate);
 +        }
 +
 +        @Override
 +        public synchronized void execute(Runnable command)
 +        {
 +            executeCalled = true;
 +            super.execute(command);
 +        }
 +
 +        public synchronized boolean isExecuteCalled()
 +        {
 +            return executeCalled;
 +        }
 +
 +        public synchronized void setExecuteCalled(boolean executeCalled)
 +        {
 +            this.executeCalled = executeCalled;
 +        }
 +    }
 +
-     /**
-      * This is required to work around https://issues.apache.org/jira/browse/CURATOR-17
-      */
-     public static class ShutdownNowIgnoringExecutorService extends DelegatingExecutorService
-     {
-         public ShutdownNowIgnoringExecutorService(ExecutorService delegate)
-         {
-             super(delegate);
-         }
- 
-         @Override
-         public void shutdown()
-         {
-             // ignore
-         }
- 
-         @Override
-         public List<Runnable> shutdownNow()
-         {
-             // ignore
-             return ImmutableList.of();
-         }
-     }
- 
 +    public static class DelegatingExecutorService implements ExecutorService
 +    {
 +        private final ExecutorService delegate;
 +
 +        public DelegatingExecutorService(
 +                ExecutorService delegate
 +        )
 +        {
 +            this.delegate = delegate;
 +        }
 +
 +
 +        @Override
 +        public void shutdown()
 +        {
 +            delegate.shutdown();
 +        }
 +
 +        @Override
 +        public List<Runnable> shutdownNow()
 +        {
 +            return delegate.shutdownNow();
 +        }
 +
 +        @Override
 +        public boolean isShutdown()
 +        {
 +            return delegate.isShutdown();
 +        }
 +
 +        @Override
 +        public boolean isTerminated()
 +        {
 +            return delegate.isTerminated();
 +        }
 +
 +        @Override
 +        public boolean awaitTermination(long timeout, TimeUnit unit)
 +                throws InterruptedException
 +        {
 +            return delegate.awaitTermination(timeout, unit);
 +        }
 +
 +        @Override
 +        public <T> Future<T> submit(Callable<T> task)
 +        {
 +            return delegate.submit(task);
 +        }
 +
 +        @Override
 +        public <T> Future<T> submit(Runnable task, T result)
 +        {
 +            return delegate.submit(task, result);
 +        }
 +
 +        @Override
 +        public Future<?> submit(Runnable task)
 +        {
 +            return delegate.submit(task);
 +        }
 +
 +        @Override
 +        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 +                throws InterruptedException
 +        {
 +            return delegate.invokeAll(tasks);
 +        }
 +
 +        @Override
 +        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
 +                throws InterruptedException
 +        {
 +            return delegate.invokeAll(tasks, timeout, unit);
 +        }
 +
 +        @Override
 +        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 +                throws InterruptedException, ExecutionException
 +        {
 +            return delegate.invokeAny(tasks);
 +        }
 +
 +        @Override
 +        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
 +                throws InterruptedException, ExecutionException, TimeoutException
 +        {
 +            return delegate.invokeAny(tasks, timeout, unit);
 +        }
 +
 +        @Override
 +        public void execute(Runnable command)
 +        {
 +            delegate.execute(command);
 +        }
 +    }
  }


[11/12] git commit: Added license headers

Posted by ra...@apache.org.
Added license headers


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/39ca0565
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/39ca0565
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/39ca0565

Branch: refs/heads/master
Commit: 39ca0565ec0e1869c2fe0211a9f0b04126121048
Parents: 70fcae0
Author: randgalt <ra...@apache.org>
Authored: Sat May 11 12:58:01 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Sat May 11 12:58:01 2013 -0700

----------------------------------------------------------------------
 .../curator/utils/CloseableExecutorService.java    |    2 +-
 .../utils/CloseableScheduledExecutorService.java   |    2 +-
 .../utils/TestCloseableExecutorService.java        |    2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/39ca0565/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
index 8ff7b32..bb4855d 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/39ca0565/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
index c2bea63..6f3797d 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/39ca0565/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
index cb71116..c083655 100644
--- a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
+++ b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information


[03/12] git commit: CURATOR-17 PathChildrenCache.close() calls shutdownNow() on its executor, always. Instead, it Curator (in general) should only close tasks it has started. This is now done via wrapped executors in a new class that tracks tasks started

Posted by ra...@apache.org.
CURATOR-17
PathChildrenCache.close() calls shutdownNow() on its executor, always. Instead, it Curator (in general) should only close tasks it has started. This is now done
via wrapped executors in a new class that tracks tasks started by Curator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/10df9fc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/10df9fc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/10df9fc2

Branch: refs/heads/master
Commit: 10df9fc24a2c36eac9812af1cc4c10c0bab1ae9b
Parents: 86b82ab
Author: randgalt <ra...@apache.org>
Authored: Mon May 6 13:11:49 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Mon May 6 13:11:49 2013 -0700

----------------------------------------------------------------------
 .../curator/utils/CloseableExecutorService.java    |  111 +++++++++-
 .../utils/CloseableExecutorServiceBase.java        |  124 -----------
 .../utils/CloseableScheduledExecutorService.java   |  104 ++++------
 .../org/apache/curator/utils/FutureContainer.java  |   91 --------
 .../utils/TestCloseableExecutorService.java        |  160 ++++++---------
 .../framework/recipes/cache/PathChildrenCache.java |   16 +-
 .../framework/recipes/locks/ChildReaper.java       |    9 +-
 .../curator/framework/recipes/locks/Reaper.java    |   11 +-
 .../framework/recipes/locks/TestReaper.java        |   55 +++---
 9 files changed, 258 insertions(+), 423 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
index cf92ef4..4024d29 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
@@ -1,28 +1,121 @@
 package org.apache.curator.utils;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * Decorates an {@link ExecutorService} such that submitted tasks
- * are recorded and can be closed en masse.
+ * Decoration on an ExecutorService that tracks created futures and provides
+ * a method to close futures created via this class
  */
-public class CloseableExecutorService extends CloseableExecutorServiceBase
+public class CloseableExecutorService implements Closeable
 {
-    private final ListeningExecutorService executorService;
+    private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap());
+    private final ExecutorService executorService;
+    protected final AtomicBoolean isOpen = new AtomicBoolean(true);
+
+    protected class InternalFutureTask<T> extends FutureTask<T>
+    {
+        private final RunnableFuture<T> task;
+
+        InternalFutureTask(RunnableFuture<T> task)
+        {
+            super(task, null);
+            this.task = task;
+            futures.add(task);
+        }
+
+        protected void done()
+        {
+            futures.remove(task);
+        }
+    }
 
     /**
      * @param executorService the service to decorate
      */
     public CloseableExecutorService(ExecutorService executorService)
     {
-        this.executorService = MoreExecutors.listeningDecorator(executorService);
+        this.executorService = executorService;
+    }
+
+    /**
+     * Returns <tt>true</tt> if this executor has been shut down.
+     *
+     * @return <tt>true</tt> if this executor has been shut down
+     */
+    public boolean isShutdown()
+    {
+        return !isOpen.get();
     }
 
+    @VisibleForTesting
+    int size()
+    {
+        return futures.size();
+    }
+
+    /**
+     * Closes any tasks currently in progress
+     */
     @Override
-    protected ListeningExecutorService getService()
+    public void close()
     {
-        return executorService;
+        isOpen.set(false);
+        Iterator<Future<?>> iterator = futures.iterator();
+        while ( iterator.hasNext() )
+        {
+            Future<?> future = iterator.next();
+            iterator.remove();
+            if ( !future.cancel(true) )
+            {
+                System.err.println("Could not cancel");
+                throw new RuntimeException("Could not cancel");
+            }
+        }
+    }
+
+    /**
+     * Submits a value-returning task for execution and returns a Future
+     * representing the pending results of the task.  Upon completion,
+     * this task may be taken or polled.
+     *
+     * @param task the task to submit
+     * @return a future to watch the task
+     */
+    public<V> Future<V> submit(Callable<V> task)
+    {
+        Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
+
+        InternalFutureTask<V> futureTask = new InternalFutureTask<V>(new FutureTask<V>(task));
+        executorService.execute(futureTask);
+        return futureTask;
+    }
+
+    /**
+     * Submits a Runnable task for execution and returns a Future
+     * representing that task.  Upon completion, this task may be
+     * taken or polled.
+     *
+     * @param task the task to submit
+     * @return a future to watch the task
+     */
+    public Future<?> submit(Runnable task)
+    {
+        Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
+
+        InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null));
+        executorService.execute(futureTask);
+        return futureTask;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
deleted file mode 100644
index 92371d7..0000000
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
+++ /dev/null
@@ -1,124 +0,0 @@
-package org.apache.curator.utils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.io.Closeable;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Decorates an {@link ExecutorService} such that submitted tasks
- * are recorded and can be closed en masse.
- */
-abstract class CloseableExecutorServiceBase implements Closeable
-{
-    private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap());
-    private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
-    protected abstract ListeningExecutorService getService();
-
-    @Override
-    public void close()
-    {
-        isClosed.set(true);
-        Iterator<Future<?>> iterator = futures.iterator();
-        while ( iterator.hasNext() )
-        {
-            Future<?> future = iterator.next();
-            iterator.remove();
-            future.cancel(true);
-        }
-    }
-
-    /**
-     * @see ExecutorService#isShutdown()
-     * @return true/false
-     */
-    public boolean isShutdown()
-    {
-        return getService().isShutdown();
-    }
-
-    /**
-     * @see ExecutorService#isTerminated()
-     * @return true/false
-     */
-    public boolean isTerminated()
-    {
-        return getService().isTerminated();
-    }
-
-    /**
-     * Calls {@link ExecutorService#submit(Callable)}, records
-     * and returns the future
-     *
-     * @param task task to submit
-     * @return the future
-     */
-    public <T> Future<T> submit(Callable<T> task)
-    {
-        return record(getService().submit(task));
-    }
-
-    /**
-     * Calls {@link ExecutorService#submit(Runnable)}, records
-     * and returns the future
-     *
-     * @param task task to submit
-     * @return the future
-     */
-    public Future<?> submit(Runnable task)
-    {
-        return record(getService().submit(task));
-    }
-
-    @VisibleForTesting
-    int size()
-    {
-        return futures.size();
-    }
-
-    protected <T> ScheduledFuture<T> record(final ScheduledFuture<T> future)
-    {
-        if ( isClosed.get() )
-        {
-            future.cancel(true);
-        }
-        else
-        {
-            futures.add(future);
-        }
-        return future;
-    }
-
-    protected <T> Future<T> record(final ListenableFuture<T> future)
-    {
-        Runnable listener = new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                futures.remove(future);
-            }
-        };
-        if ( isClosed.get() )
-        {
-            future.cancel(true);
-        }
-        else
-        {
-            futures.add(future);
-            future.addListener(listener, MoreExecutors.sameThreadExecutor());
-        }
-        return future;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
index 8638ee6..737ff6b 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
@@ -1,100 +1,72 @@
 package org.apache.curator.utils;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Decorates an {@link ExecutorService} such that submitted tasks
- * are recorded and can be closed en masse.
+ * Decoration on an ScheduledExecutorService that tracks created futures and provides
+ * a method to close futures created via this class
  */
-public class CloseableScheduledExecutorService extends CloseableExecutorServiceBase
+public class CloseableScheduledExecutorService extends CloseableExecutorService
 {
-    private final ListeningScheduledExecutorService executorService;
+    private final ScheduledExecutorService scheduledExecutorService;
 
     /**
-     * @param executorService the service to decorate
+     * @param scheduledExecutorService the service to decorate
      */
-    public CloseableScheduledExecutorService(ScheduledExecutorService executorService)
+    public CloseableScheduledExecutorService(ScheduledExecutorService scheduledExecutorService)
     {
-        this.executorService = MoreExecutors.listeningDecorator(executorService);
-    }
-
-    @Override
-    protected ListeningExecutorService getService()
-    {
-        return executorService;
+        super(scheduledExecutorService);
+        this.scheduledExecutorService = scheduledExecutorService;
     }
 
     /**
-     * Calls {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)}, records
-     * and returns the future
+     * Creates and executes a one-shot action that becomes enabled
+     * after the given delay.
      *
-     * @param command the task to execute
+     * @param task  the task to execute
      * @param delay the time from now to delay execution
-     * @param unit the time unit of the delay parameter
-     * @return a ScheduledFuture representing pending completion of
+     * @param unit  the time unit of the delay parameter
+     * @return a Future representing pending completion of
      *         the task and whose <tt>get()</tt> method will return
      *         <tt>null</tt> upon completion
      */
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+    public Future<?> schedule(Runnable task, long delay, TimeUnit unit)
     {
-        return record(executorService.schedule(command, delay, unit));
-    }
+        Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
 
-    /**
-     * Calls {@link ScheduledExecutorService#schedule(Callable, long, TimeUnit)}, records
-     * and returns the future
-     *
-     * @param callable the task to execute
-     * @param delay the time from now to delay execution
-     * @param unit the time unit of the delay parameter
-     * @return a ScheduledFuture representing pending completion of
-     *         the task and whose <tt>get()</tt> method will return
-     *         <tt>null</tt> upon completion
-     */
-    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
-    {
-        return record(executorService.schedule(callable, delay, unit));
+        InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null));
+        scheduledExecutorService.schedule(futureTask, delay, unit);
+        return futureTask;
     }
 
     /**
-     * Calls {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}, records
-     * and returns the future
+     * Creates and executes a periodic action that becomes enabled first
+     * after the given initial delay, and subsequently with the
+     * given delay between the termination of one execution and the
+     * commencement of the next.  If any execution of the task
+     * encounters an exception, subsequent executions are suppressed.
+     * Otherwise, the task will only terminate via cancellation or
+     * termination of the executor.
      *
-     * @param command the task to execute
+     * @param task      the task to execute
      * @param initialDelay the time to delay first execution
-     * @param period the period between successive executions
-     * @param unit the time unit of the initialDelay and period parameters
-     * @return a ScheduledFuture representing pending completion of
+     * @param delay        the delay between the termination of one
+     *                     execution and the commencement of the next
+     * @param unit         the time unit of the initialDelay and delay parameters
+     * @return a Future representing pending completion of
      *         the task, and whose <tt>get()</tt> method will throw an
      *         exception upon cancellation
      */
-    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+    public Future<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit)
     {
-        return record(executorService.scheduleAtFixedRate(command, initialDelay, period, unit));
-    }
+        Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
 
-    /**
-     * Calls {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}, records
-     * and returns the future
-     *
-     * @param command the task to execute
-     * @param initialDelay the time to delay first execution
-     * @param delay the delay between the termination of one
-     * execution and the commencement of the next
-     * @param unit the time unit of the initialDelay and delay parameters
-     * @return a ScheduledFuture representing pending completion of
-     *         the task, and whose <tt>get()</tt> method will throw an
-     *         exception upon cancellation
-     */
-    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
-    {
-        return record(executorService.scheduleWithFixedDelay(command, initialDelay, delay, unit));
+        InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null));
+        scheduledExecutorService.scheduleWithFixedDelay(futureTask, initialDelay, delay, unit);
+        return futureTask;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java b/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
deleted file mode 100644
index 51fe6a4..0000000
--- a/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.curator.utils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.io.Closeable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.RunnableFuture;
-
-public class FutureContainer implements Closeable
-{
-    private final List<Future<?>> futures = Lists.newArrayList();
-    private final ExecutorService executorService;
-
-    private class QueueingFuture<T> extends FutureTask<T>
-    {
-        private final RunnableFuture<T> task;
-
-        QueueingFuture(RunnableFuture<T> task)
-        {
-            super(task, null);
-            this.task = task;
-            futures.add(task);
-        }
-
-        protected void done()
-        {
-            futures.remove(task);
-        }
-    }
-
-    public FutureContainer(ExecutorService executorService)
-    {
-        this.executorService = executorService;
-    }
-
-    @VisibleForTesting
-    int size()
-    {
-        return futures.size();
-    }
-
-    @Override
-    public void close()
-    {
-        Iterator<Future<?>> iterator = futures.iterator();
-        while ( iterator.hasNext() )
-        {
-            Future<?> future = iterator.next();
-            iterator.remove();
-            if ( !future.cancel(true) )
-            {
-                System.err.println("Could not cancel");
-                throw new RuntimeException("Could not cancel");
-            }
-        }
-    }
-
-    /**
-     * Submits a value-returning task for execution and returns a Future
-     * representing the pending results of the task.  Upon completion,
-     * this task may be taken or polled.
-     *
-     * @param task the task to submit
-     */
-    public<V> void submit(Callable<V> task)
-    {
-        FutureTask<V> futureTask = new FutureTask<V>(task);
-        executorService.execute(new QueueingFuture<V>(futureTask));
-    }
-
-    /**
-     * Submits a Runnable task for execution and returns a Future
-     * representing that task.  Upon completion, this task may be
-     * taken or polled.
-     *
-     * @param task the task to submit
-     */
-    public void submit(Runnable task)
-    {
-        FutureTask<Void> futureTask = new FutureTask<Void>(task, null);
-        executorService.execute(new QueueingFuture<Void>(futureTask));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
index 2cd2901..72b63fd 100644
--- a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
+++ b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
@@ -12,20 +12,17 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestCloseableExecutorService
 {
     private static final int QTY = 10;
 
     private volatile ExecutorService executorService;
-    private volatile AtomicInteger count;
 
     @BeforeMethod
     public void setup()
     {
         executorService = Executors.newFixedThreadPool(QTY * 2);
-        count = new AtomicInteger(0);
     }
 
     @AfterMethod
@@ -39,7 +36,7 @@ public class TestCloseableExecutorService
     {
         try
         {
-            FutureContainer service = new FutureContainer(executorService);
+            CloseableExecutorService service = new CloseableExecutorService(executorService);
             CountDownLatch startLatch = new CountDownLatch(QTY);
             CountDownLatch latch = new CountDownLatch(QTY);
             for ( int i = 0; i < QTY; ++i )
@@ -47,9 +44,9 @@ public class TestCloseableExecutorService
                 submitRunnable(service, startLatch, latch);
             }
 
-            Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+            Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
             service.close();
-            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
         }
         catch ( AssertionError e )
         {
@@ -65,41 +62,39 @@ public class TestCloseableExecutorService
     public void testBasicCallable() throws InterruptedException
     {
         CloseableExecutorService service = new CloseableExecutorService(executorService);
-        List<CountDownLatch> latches = Lists.newArrayList();
+        final CountDownLatch startLatch = new CountDownLatch(QTY);
+        final CountDownLatch latch = new CountDownLatch(QTY);
         for ( int i = 0; i < QTY; ++i )
         {
-            final CountDownLatch latch = new CountDownLatch(1);
-            latches.add(latch);
             service.submit
-                (
-                    new Callable<Void>()
+            (
+                new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
                     {
-                        @Override
-                        public Void call() throws Exception
+                        try
+                        {
+                            startLatch.countDown();
+                            Thread.currentThread().join();
+                        }
+                        catch ( InterruptedException e )
+                        {
+                            Thread.currentThread().interrupt();
+                        }
+                        finally
                         {
-                            try
-                            {
-                                Thread.currentThread().join();
-                            }
-                            catch ( InterruptedException e )
-                            {
-                                Thread.currentThread().interrupt();
-                            }
-                            finally
-                            {
-                                latch.countDown();
-                            }
-                            return null;
+                            latch.countDown();
                         }
+                        return null;
                     }
-                );
+                }
+            );
         }
 
+        Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
         service.close();
-        for ( CountDownLatch latch : latches )
-        {
-            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
-        }
+        Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
     }
 
     @Test
@@ -107,6 +102,7 @@ public class TestCloseableExecutorService
     {
         CloseableExecutorService service = new CloseableExecutorService(executorService);
         List<Future<?>> futures = Lists.newArrayList();
+        final CountDownLatch startLatch = new CountDownLatch(QTY);
         for ( int i = 0; i < QTY; ++i )
         {
             Future<?> future = service.submit
@@ -118,6 +114,7 @@ public class TestCloseableExecutorService
                     {
                         try
                         {
+                            startLatch.countDown();
                             Thread.currentThread().join();
                         }
                         catch ( InterruptedException e )
@@ -130,7 +127,7 @@ public class TestCloseableExecutorService
             futures.add(future);
         }
 
-        Thread.sleep(100);
+        Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
 
         for ( Future<?> future : futures )
         {
@@ -144,6 +141,7 @@ public class TestCloseableExecutorService
     public void testListeningCallable() throws InterruptedException
     {
         CloseableExecutorService service = new CloseableExecutorService(executorService);
+        final CountDownLatch startLatch = new CountDownLatch(QTY);
         List<Future<?>> futures = Lists.newArrayList();
         for ( int i = 0; i < QTY; ++i )
         {
@@ -156,6 +154,7 @@ public class TestCloseableExecutorService
                     {
                         try
                         {
+                            startLatch.countDown();
                             Thread.currentThread().join();
                         }
                         catch ( InterruptedException e )
@@ -169,8 +168,7 @@ public class TestCloseableExecutorService
             futures.add(future);
         }
 
-        Thread.sleep(100);
-
+        Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
         for ( Future<?> future : futures )
         {
             future.cancel(true);
@@ -182,69 +180,52 @@ public class TestCloseableExecutorService
     @Test
     public void testPartialRunnable() throws InterruptedException
     {
-        try
-        {
-            final CountDownLatch outsideLatch = new CountDownLatch(1);
-            executorService.submit
-            (
-                new Runnable()
+        final CountDownLatch outsideLatch = new CountDownLatch(1);
+        executorService.submit
+        (
+            new Runnable()
+            {
+                @Override
+                public void run()
                 {
-                    @Override
-                    public void run()
+                    try
                     {
-                        try
-                        {
-                            Thread.currentThread().join();
-                        }
-                        catch ( InterruptedException e )
-                        {
-                            Thread.currentThread().interrupt();
-                        }
-                        finally
-                        {
-                            outsideLatch.countDown();
-                        }
+                        Thread.currentThread().join();
+                    }
+                    catch ( InterruptedException e )
+                    {
+                        Thread.currentThread().interrupt();
+                    }
+                    finally
+                    {
+                        outsideLatch.countDown();
                     }
                 }
-            );
-
-            FutureContainer service = new FutureContainer(executorService);
-            CountDownLatch startLatch = new CountDownLatch(QTY);
-            CountDownLatch latch = new CountDownLatch(QTY);
-            for ( int i = 0; i < QTY; ++i )
-            {
-                submitRunnable(service, startLatch, latch);
             }
+        );
 
-            while ( service.size() < QTY )
-            {
-                Thread.sleep(100);
-            }
-
-            Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
-            service.close();
-            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
-            Assert.assertEquals(outsideLatch.getCount(), 1);
-        }
-        catch ( AssertionError e )
-        {
-            throw e;
-        }
-        catch ( Throwable e )
+        CloseableExecutorService service = new CloseableExecutorService(executorService);
+        CountDownLatch startLatch = new CountDownLatch(QTY);
+        CountDownLatch latch = new CountDownLatch(QTY);
+        for ( int i = 0; i < QTY; ++i )
         {
-            e.printStackTrace();
+            submitRunnable(service, startLatch, latch);
         }
-        finally
+
+        while ( service.size() < QTY )
         {
-            executorService.shutdownNow();
+            Thread.sleep(100);
         }
+
+        Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
+        service.close();
+        Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
+        Assert.assertEquals(outsideLatch.getCount(), 1);
     }
 
-    private void submitRunnable(FutureContainer service, final CountDownLatch startLatch, final CountDownLatch latch)
+    private void submitRunnable(CloseableExecutorService service, final CountDownLatch startLatch, final CountDownLatch latch)
     {
-        try
-        {
-            service.submit
+        service.submit
             (
                 new Runnable()
                 {
@@ -254,29 +235,18 @@ public class TestCloseableExecutorService
                         try
                         {
                             startLatch.countDown();
-                            count.incrementAndGet();
                             Thread.sleep(100000);
                         }
                         catch ( InterruptedException e )
                         {
                             Thread.currentThread().interrupt();
                         }
-                        catch ( Throwable e )
-                        {
-                            e.printStackTrace();
-                        }
                         finally
                         {
-    //                        count.decrementAndGet();
                             latch.countDown();
                         }
                     }
                 }
             );
-        }
-        catch ( Throwable e )
-        {
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index f42039c..61c3af7 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -263,16 +263,16 @@ public class PathChildrenCache implements Closeable
 
         client.getConnectionStateListenable().addListener(connectionStateListener);
         executorService.submit
-        (
-            new Runnable()
-            {
-                @Override
-                public void run()
+            (
+                new Runnable()
                 {
-                    mainLoop();
+                    @Override
+                    public void run()
+                    {
+                        mainLoop();
+                    }
                 }
-            }
-        );
+            );
 
         switch ( mode )
         {

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
index b0717ed..c8c1510 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.locks;
 import com.google.common.base.Preconditions;
 import com.google.common.io.Closeables;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.CloseableScheduledExecutorService;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.data.Stat;
@@ -29,8 +30,8 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -46,10 +47,10 @@ public class ChildReaper implements Closeable
     private final CuratorFramework client;
     private final String path;
     private final Reaper.Mode mode;
-    private final ScheduledExecutorService executor;
+    private final CloseableScheduledExecutorService executor;
     private final int reapingThresholdMs;
 
-    private volatile ScheduledFuture<?> task;
+    private volatile Future<?> task;
 
     private enum State
     {
@@ -91,7 +92,7 @@ public class ChildReaper implements Closeable
         this.client = client;
         this.path = path;
         this.mode = mode;
-        this.executor = executor;
+        this.executor = new CloseableScheduledExecutorService(executor);
         this.reapingThresholdMs = reapingThresholdMs;
         this.reaper = new Reaper(client, executor, reapingThresholdMs);
     }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
index b540689..037eacd 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -153,7 +154,7 @@ public class Reaper implements Closeable
     public void addPath(String path, Mode mode)
     {
         activePaths.add(path);
-        executor.schedule(new PathHolder(path, mode, 0), reapingThresholdMs, TimeUnit.MILLISECONDS);
+        schedule(new PathHolder(path, mode, 0), reapingThresholdMs);
     }
 
     /**
@@ -186,6 +187,12 @@ public class Reaper implements Closeable
         }
     }
 
+    @VisibleForTesting
+    protected Future<?> schedule(PathHolder pathHolder, int reapingThresholdMs)
+    {
+        return executor.schedule(pathHolder, reapingThresholdMs, TimeUnit.MILLISECONDS);
+    }
+
     private void reap(PathHolder holder)
     {
         if ( !activePaths.contains(holder.path) )
@@ -251,7 +258,7 @@ public class Reaper implements Closeable
         }
         else if ( !Thread.currentThread().isInterrupted() && (state.get() == State.STARTED) && activePaths.contains(holder.path) )
         {
-            executor.schedule(new PathHolder(holder.path, holder.mode, newEmptyCount), reapingThresholdMs, TimeUnit.MILLISECONDS);
+            schedule(new PathHolder(holder.path, holder.mode, newEmptyCount), reapingThresholdMs);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
index 596960d..bd821e4 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
@@ -19,6 +19,7 @@
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.io.Closeables;
+import junit.framework.Assert;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.BaseClassForTests;
@@ -27,13 +28,20 @@ import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
-import junit.framework.Assert;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
 import org.testng.annotations.Test;
 import java.io.IOException;
 import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 public class TestReaper extends BaseClassForTests
 {
@@ -106,37 +114,36 @@ public class TestReaper extends BaseClassForTests
 
             final Queue<Reaper.PathHolder>  holders = new ConcurrentLinkedQueue<Reaper.PathHolder>();
             final ExecutorService           pool = Executors.newCachedThreadPool();
-            ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1)
+            ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1);
+
+            reaper = new Reaper
+            (
+                client,
+                service,
+                THRESHOLD
+            )
             {
                 @Override
-                public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+                protected Future<Void> schedule(final PathHolder pathHolder, int reapingThresholdMs)
                 {
-                    final Reaper.PathHolder     pathHolder = (Reaper.PathHolder)command;
                     holders.add(pathHolder);
-                    final ScheduledFuture<?>    f = super.schedule(command, delay, unit);
+                    final Future<?>    f = super.schedule(pathHolder, reapingThresholdMs);
                     pool.submit
-                    (
-                        new Callable<Void>()
-                        {
-                            @Override
-                            public Void call() throws Exception
+                        (
+                            new Callable<Void>()
                             {
-                                f.get();
-                                holders.remove(pathHolder);
-                                return null;
+                                @Override
+                                public Void call() throws Exception
+                                {
+                                    f.get();
+                                    holders.remove(pathHolder);
+                                    return null;
+                                }
                             }
-                        }
-                    );
-                    return f;
+                        );
+                    return null;
                 }
             };
-
-            reaper = new Reaper
-            (
-                client,
-                service,
-                THRESHOLD
-            );
             reaper.start();
             reaper.addPath("/one/two/three");
 


[04/12] git commit: Merge branch 'master' into 2.0.1-incubating

Posted by ra...@apache.org.
Merge branch 'master' into 2.0.1-incubating


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/9acf5929
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/9acf5929
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/9acf5929

Branch: refs/heads/master
Commit: 9acf5929f54493759411cbf4fd38b5fd56261aaf
Parents: 75b5404 9eff1cf
Author: randgalt <ra...@apache.org>
Authored: Thu May 9 16:23:25 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Thu May 9 16:23:25 2013 -0700

----------------------------------------------------------------------
 src/site/site.xml |    1 +
 1 files changed, 1 insertions(+), 0 deletions(-)
----------------------------------------------------------------------