You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/05/06 22:13:01 UTC

[1/2] git commit: temp checkin

Updated Branches:
  refs/heads/CURATOR-17 [created] 10df9fc24


temp checkin


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/86b82ab6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/86b82ab6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/86b82ab6

Branch: refs/heads/CURATOR-17
Commit: 86b82ab61d951429121e2f869bc00ff4cede9407
Parents: 97cda39
Author: randgalt <ra...@apache.org>
Authored: Mon May 6 11:56:19 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Mon May 6 11:56:19 2013 -0700

----------------------------------------------------------------------
 .../curator/utils/CloseableExecutorService.java    |   28 ++
 .../utils/CloseableExecutorServiceBase.java        |  124 +++++++
 .../utils/CloseableScheduledExecutorService.java   |  100 +++++
 .../org/apache/curator/utils/FutureContainer.java  |   91 +++++
 .../utils/TestCloseableExecutorService.java        |  282 +++++++++++++++
 .../framework/recipes/cache/PathChildrenCache.java |   25 +-
 .../curator/framework/recipes/locks/Reaper.java    |   14 +-
 7 files changed, 642 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
new file mode 100644
index 0000000..cf92ef4
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
@@ -0,0 +1,28 @@
+package org.apache.curator.utils;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Decorates an {@link ExecutorService} such that submitted tasks
+ * are recorded and can be closed en masse.
+ */
+public class CloseableExecutorService extends CloseableExecutorServiceBase
+{
+    private final ListeningExecutorService executorService;
+
+    /**
+     * @param executorService the service to decorate
+     */
+    public CloseableExecutorService(ExecutorService executorService)
+    {
+        this.executorService = MoreExecutors.listeningDecorator(executorService);
+    }
+
+    @Override
+    protected ListeningExecutorService getService()
+    {
+        return executorService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
new file mode 100644
index 0000000..92371d7
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
@@ -0,0 +1,124 @@
+package org.apache.curator.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Decorates an {@link ExecutorService} such that submitted tasks
+ * are recorded and can be closed en masse.
+ */
+abstract class CloseableExecutorServiceBase implements Closeable
+{
+    private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap());
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+    protected abstract ListeningExecutorService getService();
+
+    @Override
+    public void close()
+    {
+        isClosed.set(true);
+        Iterator<Future<?>> iterator = futures.iterator();
+        while ( iterator.hasNext() )
+        {
+            Future<?> future = iterator.next();
+            iterator.remove();
+            future.cancel(true);
+        }
+    }
+
+    /**
+     * @see ExecutorService#isShutdown()
+     * @return true/false
+     */
+    public boolean isShutdown()
+    {
+        return getService().isShutdown();
+    }
+
+    /**
+     * @see ExecutorService#isTerminated()
+     * @return true/false
+     */
+    public boolean isTerminated()
+    {
+        return getService().isTerminated();
+    }
+
+    /**
+     * Calls {@link ExecutorService#submit(Callable)}, records
+     * and returns the future
+     *
+     * @param task task to submit
+     * @return the future
+     */
+    public <T> Future<T> submit(Callable<T> task)
+    {
+        return record(getService().submit(task));
+    }
+
+    /**
+     * Calls {@link ExecutorService#submit(Runnable)}, records
+     * and returns the future
+     *
+     * @param task task to submit
+     * @return the future
+     */
+    public Future<?> submit(Runnable task)
+    {
+        return record(getService().submit(task));
+    }
+
+    @VisibleForTesting
+    int size()
+    {
+        return futures.size();
+    }
+
+    protected <T> ScheduledFuture<T> record(final ScheduledFuture<T> future)
+    {
+        if ( isClosed.get() )
+        {
+            future.cancel(true);
+        }
+        else
+        {
+            futures.add(future);
+        }
+        return future;
+    }
+
+    protected <T> Future<T> record(final ListenableFuture<T> future)
+    {
+        Runnable listener = new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                futures.remove(future);
+            }
+        };
+        if ( isClosed.get() )
+        {
+            future.cancel(true);
+        }
+        else
+        {
+            futures.add(future);
+            future.addListener(listener, MoreExecutors.sameThreadExecutor());
+        }
+        return future;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
new file mode 100644
index 0000000..8638ee6
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
@@ -0,0 +1,100 @@
+package org.apache.curator.utils;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Decorates an {@link ExecutorService} such that submitted tasks
+ * are recorded and can be closed en masse.
+ */
+public class CloseableScheduledExecutorService extends CloseableExecutorServiceBase
+{
+    private final ListeningScheduledExecutorService executorService;
+
+    /**
+     * @param executorService the service to decorate
+     */
+    public CloseableScheduledExecutorService(ScheduledExecutorService executorService)
+    {
+        this.executorService = MoreExecutors.listeningDecorator(executorService);
+    }
+
+    @Override
+    protected ListeningExecutorService getService()
+    {
+        return executorService;
+    }
+
+    /**
+     * Calls {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)}, records
+     * and returns the future
+     *
+     * @param command the task to execute
+     * @param delay the time from now to delay execution
+     * @param unit the time unit of the delay parameter
+     * @return a ScheduledFuture representing pending completion of
+     *         the task and whose <tt>get()</tt> method will return
+     *         <tt>null</tt> upon completion
+     */
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+    {
+        return record(executorService.schedule(command, delay, unit));
+    }
+
+    /**
+     * Calls {@link ScheduledExecutorService#schedule(Callable, long, TimeUnit)}, records
+     * and returns the future
+     *
+     * @param callable the task to execute
+     * @param delay the time from now to delay execution
+     * @param unit the time unit of the delay parameter
+     * @return a ScheduledFuture representing pending completion of
+     *         the task and whose <tt>get()</tt> method will return
+     *         <tt>null</tt> upon completion
+     */
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
+    {
+        return record(executorService.schedule(callable, delay, unit));
+    }
+
+    /**
+     * Calls {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}, records
+     * and returns the future
+     *
+     * @param command the task to execute
+     * @param initialDelay the time to delay first execution
+     * @param period the period between successive executions
+     * @param unit the time unit of the initialDelay and period parameters
+     * @return a ScheduledFuture representing pending completion of
+     *         the task, and whose <tt>get()</tt> method will throw an
+     *         exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+    {
+        return record(executorService.scheduleAtFixedRate(command, initialDelay, period, unit));
+    }
+
+    /**
+     * Calls {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}, records
+     * and returns the future
+     *
+     * @param command the task to execute
+     * @param initialDelay the time to delay first execution
+     * @param delay the delay between the termination of one
+     * execution and the commencement of the next
+     * @param unit the time unit of the initialDelay and delay parameters
+     * @return a ScheduledFuture representing pending completion of
+     *         the task, and whose <tt>get()</tt> method will throw an
+     *         exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+    {
+        return record(executorService.scheduleWithFixedDelay(command, initialDelay, delay, unit));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java b/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
new file mode 100644
index 0000000..51fe6a4
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
@@ -0,0 +1,91 @@
+package org.apache.curator.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+
+public class FutureContainer implements Closeable
+{
+    private final List<Future<?>> futures = Lists.newArrayList();
+    private final ExecutorService executorService;
+
+    private class QueueingFuture<T> extends FutureTask<T>
+    {
+        private final RunnableFuture<T> task;
+
+        QueueingFuture(RunnableFuture<T> task)
+        {
+            super(task, null);
+            this.task = task;
+            futures.add(task);
+        }
+
+        protected void done()
+        {
+            futures.remove(task);
+        }
+    }
+
+    public FutureContainer(ExecutorService executorService)
+    {
+        this.executorService = executorService;
+    }
+
+    @VisibleForTesting
+    int size()
+    {
+        return futures.size();
+    }
+
+    @Override
+    public void close()
+    {
+        Iterator<Future<?>> iterator = futures.iterator();
+        while ( iterator.hasNext() )
+        {
+            Future<?> future = iterator.next();
+            iterator.remove();
+            if ( !future.cancel(true) )
+            {
+                System.err.println("Could not cancel");
+                throw new RuntimeException("Could not cancel");
+            }
+        }
+    }
+
+    /**
+     * Submits a value-returning task for execution and returns a Future
+     * representing the pending results of the task.  Upon completion,
+     * this task may be taken or polled.
+     *
+     * @param task the task to submit
+     */
+    public<V> void submit(Callable<V> task)
+    {
+        FutureTask<V> futureTask = new FutureTask<V>(task);
+        executorService.execute(new QueueingFuture<V>(futureTask));
+    }
+
+    /**
+     * Submits a Runnable task for execution and returns a Future
+     * representing that task.  Upon completion, this task may be
+     * taken or polled.
+     *
+     * @param task the task to submit
+     */
+    public void submit(Runnable task)
+    {
+        FutureTask<Void> futureTask = new FutureTask<Void>(task, null);
+        executorService.execute(new QueueingFuture<Void>(futureTask));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
new file mode 100644
index 0000000..2cd2901
--- /dev/null
+++ b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
@@ -0,0 +1,282 @@
+package org.apache.curator.utils;
+
+import com.google.common.collect.Lists;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestCloseableExecutorService
+{
+    private static final int QTY = 10;
+
+    private volatile ExecutorService executorService;
+    private volatile AtomicInteger count;
+
+    @BeforeMethod
+    public void setup()
+    {
+        executorService = Executors.newFixedThreadPool(QTY * 2);
+        count = new AtomicInteger(0);
+    }
+
+    @AfterMethod
+    public void tearDown()
+    {
+        executorService.shutdownNow();
+    }
+
+    @Test
+    public void testBasicRunnable() throws InterruptedException
+    {
+        try
+        {
+            FutureContainer service = new FutureContainer(executorService);
+            CountDownLatch startLatch = new CountDownLatch(QTY);
+            CountDownLatch latch = new CountDownLatch(QTY);
+            for ( int i = 0; i < QTY; ++i )
+            {
+                submitRunnable(service, startLatch, latch);
+            }
+
+            Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+            service.close();
+            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+        }
+        catch ( AssertionError e )
+        {
+            throw e;
+        }
+        catch ( Throwable e )
+        {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testBasicCallable() throws InterruptedException
+    {
+        CloseableExecutorService service = new CloseableExecutorService(executorService);
+        List<CountDownLatch> latches = Lists.newArrayList();
+        for ( int i = 0; i < QTY; ++i )
+        {
+            final CountDownLatch latch = new CountDownLatch(1);
+            latches.add(latch);
+            service.submit
+                (
+                    new Callable<Void>()
+                    {
+                        @Override
+                        public Void call() throws Exception
+                        {
+                            try
+                            {
+                                Thread.currentThread().join();
+                            }
+                            catch ( InterruptedException e )
+                            {
+                                Thread.currentThread().interrupt();
+                            }
+                            finally
+                            {
+                                latch.countDown();
+                            }
+                            return null;
+                        }
+                    }
+                );
+        }
+
+        service.close();
+        for ( CountDownLatch latch : latches )
+        {
+            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
+        }
+    }
+
+    @Test
+    public void testListeningRunnable() throws InterruptedException
+    {
+        CloseableExecutorService service = new CloseableExecutorService(executorService);
+        List<Future<?>> futures = Lists.newArrayList();
+        for ( int i = 0; i < QTY; ++i )
+        {
+            Future<?> future = service.submit
+            (
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            Thread.currentThread().join();
+                        }
+                        catch ( InterruptedException e )
+                        {
+                            Thread.currentThread().interrupt();
+                        }
+                    }
+                }
+            );
+            futures.add(future);
+        }
+
+        Thread.sleep(100);
+
+        for ( Future<?> future : futures )
+        {
+            future.cancel(true);
+        }
+
+        Assert.assertEquals(service.size(), 0);
+    }
+
+    @Test
+    public void testListeningCallable() throws InterruptedException
+    {
+        CloseableExecutorService service = new CloseableExecutorService(executorService);
+        List<Future<?>> futures = Lists.newArrayList();
+        for ( int i = 0; i < QTY; ++i )
+        {
+            Future<?> future = service.submit
+            (
+                new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
+                    {
+                        try
+                        {
+                            Thread.currentThread().join();
+                        }
+                        catch ( InterruptedException e )
+                        {
+                            Thread.currentThread().interrupt();
+                        }
+                        return null;
+                    }
+                }
+            );
+            futures.add(future);
+        }
+
+        Thread.sleep(100);
+
+        for ( Future<?> future : futures )
+        {
+            future.cancel(true);
+        }
+
+        Assert.assertEquals(service.size(), 0);
+    }
+
+    @Test
+    public void testPartialRunnable() throws InterruptedException
+    {
+        try
+        {
+            final CountDownLatch outsideLatch = new CountDownLatch(1);
+            executorService.submit
+            (
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            Thread.currentThread().join();
+                        }
+                        catch ( InterruptedException e )
+                        {
+                            Thread.currentThread().interrupt();
+                        }
+                        finally
+                        {
+                            outsideLatch.countDown();
+                        }
+                    }
+                }
+            );
+
+            FutureContainer service = new FutureContainer(executorService);
+            CountDownLatch startLatch = new CountDownLatch(QTY);
+            CountDownLatch latch = new CountDownLatch(QTY);
+            for ( int i = 0; i < QTY; ++i )
+            {
+                submitRunnable(service, startLatch, latch);
+            }
+
+            while ( service.size() < QTY )
+            {
+                Thread.sleep(100);
+            }
+
+            Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+            service.close();
+            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+            Assert.assertEquals(outsideLatch.getCount(), 1);
+        }
+        catch ( AssertionError e )
+        {
+            throw e;
+        }
+        catch ( Throwable e )
+        {
+            e.printStackTrace();
+        }
+        finally
+        {
+            executorService.shutdownNow();
+        }
+    }
+
+    private void submitRunnable(FutureContainer service, final CountDownLatch startLatch, final CountDownLatch latch)
+    {
+        try
+        {
+            service.submit
+            (
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            startLatch.countDown();
+                            count.incrementAndGet();
+                            Thread.sleep(100000);
+                        }
+                        catch ( InterruptedException e )
+                        {
+                            Thread.currentThread().interrupt();
+                        }
+                        catch ( Throwable e )
+                        {
+                            e.printStackTrace();
+                        }
+                        finally
+                        {
+    //                        count.decrementAndGet();
+                            latch.countDown();
+                        }
+                    }
+                }
+            );
+        }
+        catch ( Throwable e )
+        {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 9b25001..f42039c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -32,6 +32,7 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
@@ -70,7 +71,7 @@ public class PathChildrenCache implements Closeable
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFramework client;
     private final String path;
-    private final ExecutorService executorService;
+    private final CloseableExecutorService executorService;
     private final boolean cacheData;
     private final boolean dataIsCompressed;
     private final EnsurePath ensurePath;
@@ -197,7 +198,7 @@ public class PathChildrenCache implements Closeable
         this.path = path;
         this.cacheData = cacheData;
         this.dataIsCompressed = dataIsCompressed;
-        this.executorService = executorService;
+        this.executorService = new CloseableExecutorService(executorService);
         ensurePath = client.newNamespaceAwareEnsurePath(path);
     }
 
@@ -261,17 +262,17 @@ public class PathChildrenCache implements Closeable
         mode = Preconditions.checkNotNull(mode, "mode cannot be null");
 
         client.getConnectionStateListenable().addListener(connectionStateListener);
-        executorService.execute
-            (
-                new Runnable()
+        executorService.submit
+        (
+            new Runnable()
+            {
+                @Override
+                public void run()
                 {
-                    @Override
-                    public void run()
-                    {
-                        mainLoop();
-                    }
+                    mainLoop();
                 }
-            );
+            }
+        );
 
         switch ( mode )
         {
@@ -357,7 +358,7 @@ public class PathChildrenCache implements Closeable
         Preconditions.checkState(!executorService.isShutdown(), "has not been started");
 
         client.getConnectionStateListenable().removeListener(connectionStateListener);
-        executorService.shutdownNow();
+        executorService.close();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
index 11efefd..b540689 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.CloseableScheduledExecutorService;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -42,7 +43,7 @@ public class Reaper implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFramework client;
-    private final ScheduledExecutorService executor;
+    private final CloseableScheduledExecutorService executor;
     private final int reapingThresholdMs;
     private final Set<String> activePaths = Sets.newSetFromMap(Maps.<String, Boolean>newConcurrentMap());
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
@@ -127,7 +128,7 @@ public class Reaper implements Closeable
     public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs)
     {
         this.client = client;
-        this.executor = executor;
+        this.executor = new CloseableScheduledExecutorService(executor);
         this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD;
     }
 
@@ -181,14 +182,7 @@ public class Reaper implements Closeable
     {
         if ( state.compareAndSet(State.STARTED, State.CLOSED) )
         {
-            try
-            {
-                executor.shutdownNow();
-            }
-            catch ( Exception e )
-            {
-                log.error("Canceling task", e);
-            }
+            executor.close();
         }
     }
 


[2/2] git commit: CURATOR-17 PathChildrenCache.close() calls shutdownNow() on its executor, always. Instead, it Curator (in general) should only close tasks it has started. This is now done via wrapped executors in a new class that tracks tasks started b

Posted by ra...@apache.org.
CURATOR-17
PathChildrenCache.close() calls shutdownNow() on its executor, always. Instead, it Curator (in general) should only close tasks it has started. This is now done
via wrapped executors in a new class that tracks tasks started by Curator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/10df9fc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/10df9fc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/10df9fc2

Branch: refs/heads/CURATOR-17
Commit: 10df9fc24a2c36eac9812af1cc4c10c0bab1ae9b
Parents: 86b82ab
Author: randgalt <ra...@apache.org>
Authored: Mon May 6 13:11:49 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Mon May 6 13:11:49 2013 -0700

----------------------------------------------------------------------
 .../curator/utils/CloseableExecutorService.java    |  111 +++++++++-
 .../utils/CloseableExecutorServiceBase.java        |  124 -----------
 .../utils/CloseableScheduledExecutorService.java   |  104 ++++------
 .../org/apache/curator/utils/FutureContainer.java  |   91 --------
 .../utils/TestCloseableExecutorService.java        |  160 ++++++---------
 .../framework/recipes/cache/PathChildrenCache.java |   16 +-
 .../framework/recipes/locks/ChildReaper.java       |    9 +-
 .../curator/framework/recipes/locks/Reaper.java    |   11 +-
 .../framework/recipes/locks/TestReaper.java        |   55 +++---
 9 files changed, 258 insertions(+), 423 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
index cf92ef4..4024d29 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
@@ -1,28 +1,121 @@
 package org.apache.curator.utils;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * Decorates an {@link ExecutorService} such that submitted tasks
- * are recorded and can be closed en masse.
+ * Decoration on an ExecutorService that tracks created futures and provides
+ * a method to close futures created via this class
  */
-public class CloseableExecutorService extends CloseableExecutorServiceBase
+public class CloseableExecutorService implements Closeable
 {
-    private final ListeningExecutorService executorService;
+    private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap());
+    private final ExecutorService executorService;
+    protected final AtomicBoolean isOpen = new AtomicBoolean(true);
+
+    protected class InternalFutureTask<T> extends FutureTask<T>
+    {
+        private final RunnableFuture<T> task;
+
+        InternalFutureTask(RunnableFuture<T> task)
+        {
+            super(task, null);
+            this.task = task;
+            futures.add(task);
+        }
+
+        protected void done()
+        {
+            futures.remove(task);
+        }
+    }
 
     /**
      * @param executorService the service to decorate
      */
     public CloseableExecutorService(ExecutorService executorService)
     {
-        this.executorService = MoreExecutors.listeningDecorator(executorService);
+        this.executorService = executorService;
+    }
+
+    /**
+     * Returns <tt>true</tt> if this executor has been shut down.
+     *
+     * @return <tt>true</tt> if this executor has been shut down
+     */
+    public boolean isShutdown()
+    {
+        return !isOpen.get();
     }
 
+    @VisibleForTesting
+    int size()
+    {
+        return futures.size();
+    }
+
+    /**
+     * Closes any tasks currently in progress
+     */
     @Override
-    protected ListeningExecutorService getService()
+    public void close()
     {
-        return executorService;
+        isOpen.set(false);
+        Iterator<Future<?>> iterator = futures.iterator();
+        while ( iterator.hasNext() )
+        {
+            Future<?> future = iterator.next();
+            iterator.remove();
+            if ( !future.cancel(true) )
+            {
+                System.err.println("Could not cancel");
+                throw new RuntimeException("Could not cancel");
+            }
+        }
+    }
+
+    /**
+     * Submits a value-returning task for execution and returns a Future
+     * representing the pending results of the task.  Upon completion,
+     * this task may be taken or polled.
+     *
+     * @param task the task to submit
+     * @return a future to watch the task
+     */
+    public<V> Future<V> submit(Callable<V> task)
+    {
+        Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
+
+        InternalFutureTask<V> futureTask = new InternalFutureTask<V>(new FutureTask<V>(task));
+        executorService.execute(futureTask);
+        return futureTask;
+    }
+
+    /**
+     * Submits a Runnable task for execution and returns a Future
+     * representing that task.  Upon completion, this task may be
+     * taken or polled.
+     *
+     * @param task the task to submit
+     * @return a future to watch the task
+     */
+    public Future<?> submit(Runnable task)
+    {
+        Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
+
+        InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null));
+        executorService.execute(futureTask);
+        return futureTask;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
deleted file mode 100644
index 92371d7..0000000
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
+++ /dev/null
@@ -1,124 +0,0 @@
-package org.apache.curator.utils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.io.Closeable;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Decorates an {@link ExecutorService} such that submitted tasks
- * are recorded and can be closed en masse.
- */
-abstract class CloseableExecutorServiceBase implements Closeable
-{
-    private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap());
-    private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
-    protected abstract ListeningExecutorService getService();
-
-    @Override
-    public void close()
-    {
-        isClosed.set(true);
-        Iterator<Future<?>> iterator = futures.iterator();
-        while ( iterator.hasNext() )
-        {
-            Future<?> future = iterator.next();
-            iterator.remove();
-            future.cancel(true);
-        }
-    }
-
-    /**
-     * @see ExecutorService#isShutdown()
-     * @return true/false
-     */
-    public boolean isShutdown()
-    {
-        return getService().isShutdown();
-    }
-
-    /**
-     * @see ExecutorService#isTerminated()
-     * @return true/false
-     */
-    public boolean isTerminated()
-    {
-        return getService().isTerminated();
-    }
-
-    /**
-     * Calls {@link ExecutorService#submit(Callable)}, records
-     * and returns the future
-     *
-     * @param task task to submit
-     * @return the future
-     */
-    public <T> Future<T> submit(Callable<T> task)
-    {
-        return record(getService().submit(task));
-    }
-
-    /**
-     * Calls {@link ExecutorService#submit(Runnable)}, records
-     * and returns the future
-     *
-     * @param task task to submit
-     * @return the future
-     */
-    public Future<?> submit(Runnable task)
-    {
-        return record(getService().submit(task));
-    }
-
-    @VisibleForTesting
-    int size()
-    {
-        return futures.size();
-    }
-
-    protected <T> ScheduledFuture<T> record(final ScheduledFuture<T> future)
-    {
-        if ( isClosed.get() )
-        {
-            future.cancel(true);
-        }
-        else
-        {
-            futures.add(future);
-        }
-        return future;
-    }
-
-    protected <T> Future<T> record(final ListenableFuture<T> future)
-    {
-        Runnable listener = new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                futures.remove(future);
-            }
-        };
-        if ( isClosed.get() )
-        {
-            future.cancel(true);
-        }
-        else
-        {
-            futures.add(future);
-            future.addListener(listener, MoreExecutors.sameThreadExecutor());
-        }
-        return future;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
index 8638ee6..737ff6b 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
@@ -1,100 +1,72 @@
 package org.apache.curator.utils;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Decorates an {@link ExecutorService} such that submitted tasks
- * are recorded and can be closed en masse.
+ * Decoration on an ScheduledExecutorService that tracks created futures and provides
+ * a method to close futures created via this class
  */
-public class CloseableScheduledExecutorService extends CloseableExecutorServiceBase
+public class CloseableScheduledExecutorService extends CloseableExecutorService
 {
-    private final ListeningScheduledExecutorService executorService;
+    private final ScheduledExecutorService scheduledExecutorService;
 
     /**
-     * @param executorService the service to decorate
+     * @param scheduledExecutorService the service to decorate
      */
-    public CloseableScheduledExecutorService(ScheduledExecutorService executorService)
+    public CloseableScheduledExecutorService(ScheduledExecutorService scheduledExecutorService)
     {
-        this.executorService = MoreExecutors.listeningDecorator(executorService);
-    }
-
-    @Override
-    protected ListeningExecutorService getService()
-    {
-        return executorService;
+        super(scheduledExecutorService);
+        this.scheduledExecutorService = scheduledExecutorService;
     }
 
     /**
-     * Calls {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)}, records
-     * and returns the future
+     * Creates and executes a one-shot action that becomes enabled
+     * after the given delay.
      *
-     * @param command the task to execute
+     * @param task  the task to execute
      * @param delay the time from now to delay execution
-     * @param unit the time unit of the delay parameter
-     * @return a ScheduledFuture representing pending completion of
+     * @param unit  the time unit of the delay parameter
+     * @return a Future representing pending completion of
      *         the task and whose <tt>get()</tt> method will return
      *         <tt>null</tt> upon completion
      */
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+    public Future<?> schedule(Runnable task, long delay, TimeUnit unit)
     {
-        return record(executorService.schedule(command, delay, unit));
-    }
+        Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
 
-    /**
-     * Calls {@link ScheduledExecutorService#schedule(Callable, long, TimeUnit)}, records
-     * and returns the future
-     *
-     * @param callable the task to execute
-     * @param delay the time from now to delay execution
-     * @param unit the time unit of the delay parameter
-     * @return a ScheduledFuture representing pending completion of
-     *         the task and whose <tt>get()</tt> method will return
-     *         <tt>null</tt> upon completion
-     */
-    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
-    {
-        return record(executorService.schedule(callable, delay, unit));
+        InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null));
+        scheduledExecutorService.schedule(futureTask, delay, unit);
+        return futureTask;
     }
 
     /**
-     * Calls {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}, records
-     * and returns the future
+     * Creates and executes a periodic action that becomes enabled first
+     * after the given initial delay, and subsequently with the
+     * given delay between the termination of one execution and the
+     * commencement of the next.  If any execution of the task
+     * encounters an exception, subsequent executions are suppressed.
+     * Otherwise, the task will only terminate via cancellation or
+     * termination of the executor.
      *
-     * @param command the task to execute
+     * @param task      the task to execute
      * @param initialDelay the time to delay first execution
-     * @param period the period between successive executions
-     * @param unit the time unit of the initialDelay and period parameters
-     * @return a ScheduledFuture representing pending completion of
+     * @param delay        the delay between the termination of one
+     *                     execution and the commencement of the next
+     * @param unit         the time unit of the initialDelay and delay parameters
+     * @return a Future representing pending completion of
      *         the task, and whose <tt>get()</tt> method will throw an
      *         exception upon cancellation
      */
-    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+    public Future<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit)
     {
-        return record(executorService.scheduleAtFixedRate(command, initialDelay, period, unit));
-    }
+        Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
 
-    /**
-     * Calls {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}, records
-     * and returns the future
-     *
-     * @param command the task to execute
-     * @param initialDelay the time to delay first execution
-     * @param delay the delay between the termination of one
-     * execution and the commencement of the next
-     * @param unit the time unit of the initialDelay and delay parameters
-     * @return a ScheduledFuture representing pending completion of
-     *         the task, and whose <tt>get()</tt> method will throw an
-     *         exception upon cancellation
-     */
-    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
-    {
-        return record(executorService.scheduleWithFixedDelay(command, initialDelay, delay, unit));
+        InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null));
+        scheduledExecutorService.scheduleWithFixedDelay(futureTask, initialDelay, delay, unit);
+        return futureTask;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java b/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
deleted file mode 100644
index 51fe6a4..0000000
--- a/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.curator.utils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.io.Closeable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.RunnableFuture;
-
-public class FutureContainer implements Closeable
-{
-    private final List<Future<?>> futures = Lists.newArrayList();
-    private final ExecutorService executorService;
-
-    private class QueueingFuture<T> extends FutureTask<T>
-    {
-        private final RunnableFuture<T> task;
-
-        QueueingFuture(RunnableFuture<T> task)
-        {
-            super(task, null);
-            this.task = task;
-            futures.add(task);
-        }
-
-        protected void done()
-        {
-            futures.remove(task);
-        }
-    }
-
-    public FutureContainer(ExecutorService executorService)
-    {
-        this.executorService = executorService;
-    }
-
-    @VisibleForTesting
-    int size()
-    {
-        return futures.size();
-    }
-
-    @Override
-    public void close()
-    {
-        Iterator<Future<?>> iterator = futures.iterator();
-        while ( iterator.hasNext() )
-        {
-            Future<?> future = iterator.next();
-            iterator.remove();
-            if ( !future.cancel(true) )
-            {
-                System.err.println("Could not cancel");
-                throw new RuntimeException("Could not cancel");
-            }
-        }
-    }
-
-    /**
-     * Submits a value-returning task for execution and returns a Future
-     * representing the pending results of the task.  Upon completion,
-     * this task may be taken or polled.
-     *
-     * @param task the task to submit
-     */
-    public<V> void submit(Callable<V> task)
-    {
-        FutureTask<V> futureTask = new FutureTask<V>(task);
-        executorService.execute(new QueueingFuture<V>(futureTask));
-    }
-
-    /**
-     * Submits a Runnable task for execution and returns a Future
-     * representing that task.  Upon completion, this task may be
-     * taken or polled.
-     *
-     * @param task the task to submit
-     */
-    public void submit(Runnable task)
-    {
-        FutureTask<Void> futureTask = new FutureTask<Void>(task, null);
-        executorService.execute(new QueueingFuture<Void>(futureTask));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
index 2cd2901..72b63fd 100644
--- a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
+++ b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
@@ -12,20 +12,17 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestCloseableExecutorService
 {
     private static final int QTY = 10;
 
     private volatile ExecutorService executorService;
-    private volatile AtomicInteger count;
 
     @BeforeMethod
     public void setup()
     {
         executorService = Executors.newFixedThreadPool(QTY * 2);
-        count = new AtomicInteger(0);
     }
 
     @AfterMethod
@@ -39,7 +36,7 @@ public class TestCloseableExecutorService
     {
         try
         {
-            FutureContainer service = new FutureContainer(executorService);
+            CloseableExecutorService service = new CloseableExecutorService(executorService);
             CountDownLatch startLatch = new CountDownLatch(QTY);
             CountDownLatch latch = new CountDownLatch(QTY);
             for ( int i = 0; i < QTY; ++i )
@@ -47,9 +44,9 @@ public class TestCloseableExecutorService
                 submitRunnable(service, startLatch, latch);
             }
 
-            Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+            Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
             service.close();
-            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
         }
         catch ( AssertionError e )
         {
@@ -65,41 +62,39 @@ public class TestCloseableExecutorService
     public void testBasicCallable() throws InterruptedException
     {
         CloseableExecutorService service = new CloseableExecutorService(executorService);
-        List<CountDownLatch> latches = Lists.newArrayList();
+        final CountDownLatch startLatch = new CountDownLatch(QTY);
+        final CountDownLatch latch = new CountDownLatch(QTY);
         for ( int i = 0; i < QTY; ++i )
         {
-            final CountDownLatch latch = new CountDownLatch(1);
-            latches.add(latch);
             service.submit
-                (
-                    new Callable<Void>()
+            (
+                new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
                     {
-                        @Override
-                        public Void call() throws Exception
+                        try
+                        {
+                            startLatch.countDown();
+                            Thread.currentThread().join();
+                        }
+                        catch ( InterruptedException e )
+                        {
+                            Thread.currentThread().interrupt();
+                        }
+                        finally
                         {
-                            try
-                            {
-                                Thread.currentThread().join();
-                            }
-                            catch ( InterruptedException e )
-                            {
-                                Thread.currentThread().interrupt();
-                            }
-                            finally
-                            {
-                                latch.countDown();
-                            }
-                            return null;
+                            latch.countDown();
                         }
+                        return null;
                     }
-                );
+                }
+            );
         }
 
+        Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
         service.close();
-        for ( CountDownLatch latch : latches )
-        {
-            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
-        }
+        Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
     }
 
     @Test
@@ -107,6 +102,7 @@ public class TestCloseableExecutorService
     {
         CloseableExecutorService service = new CloseableExecutorService(executorService);
         List<Future<?>> futures = Lists.newArrayList();
+        final CountDownLatch startLatch = new CountDownLatch(QTY);
         for ( int i = 0; i < QTY; ++i )
         {
             Future<?> future = service.submit
@@ -118,6 +114,7 @@ public class TestCloseableExecutorService
                     {
                         try
                         {
+                            startLatch.countDown();
                             Thread.currentThread().join();
                         }
                         catch ( InterruptedException e )
@@ -130,7 +127,7 @@ public class TestCloseableExecutorService
             futures.add(future);
         }
 
-        Thread.sleep(100);
+        Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
 
         for ( Future<?> future : futures )
         {
@@ -144,6 +141,7 @@ public class TestCloseableExecutorService
     public void testListeningCallable() throws InterruptedException
     {
         CloseableExecutorService service = new CloseableExecutorService(executorService);
+        final CountDownLatch startLatch = new CountDownLatch(QTY);
         List<Future<?>> futures = Lists.newArrayList();
         for ( int i = 0; i < QTY; ++i )
         {
@@ -156,6 +154,7 @@ public class TestCloseableExecutorService
                     {
                         try
                         {
+                            startLatch.countDown();
                             Thread.currentThread().join();
                         }
                         catch ( InterruptedException e )
@@ -169,8 +168,7 @@ public class TestCloseableExecutorService
             futures.add(future);
         }
 
-        Thread.sleep(100);
-
+        Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
         for ( Future<?> future : futures )
         {
             future.cancel(true);
@@ -182,69 +180,52 @@ public class TestCloseableExecutorService
     @Test
     public void testPartialRunnable() throws InterruptedException
     {
-        try
-        {
-            final CountDownLatch outsideLatch = new CountDownLatch(1);
-            executorService.submit
-            (
-                new Runnable()
+        final CountDownLatch outsideLatch = new CountDownLatch(1);
+        executorService.submit
+        (
+            new Runnable()
+            {
+                @Override
+                public void run()
                 {
-                    @Override
-                    public void run()
+                    try
                     {
-                        try
-                        {
-                            Thread.currentThread().join();
-                        }
-                        catch ( InterruptedException e )
-                        {
-                            Thread.currentThread().interrupt();
-                        }
-                        finally
-                        {
-                            outsideLatch.countDown();
-                        }
+                        Thread.currentThread().join();
+                    }
+                    catch ( InterruptedException e )
+                    {
+                        Thread.currentThread().interrupt();
+                    }
+                    finally
+                    {
+                        outsideLatch.countDown();
                     }
                 }
-            );
-
-            FutureContainer service = new FutureContainer(executorService);
-            CountDownLatch startLatch = new CountDownLatch(QTY);
-            CountDownLatch latch = new CountDownLatch(QTY);
-            for ( int i = 0; i < QTY; ++i )
-            {
-                submitRunnable(service, startLatch, latch);
             }
+        );
 
-            while ( service.size() < QTY )
-            {
-                Thread.sleep(100);
-            }
-
-            Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
-            service.close();
-            Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
-            Assert.assertEquals(outsideLatch.getCount(), 1);
-        }
-        catch ( AssertionError e )
-        {
-            throw e;
-        }
-        catch ( Throwable e )
+        CloseableExecutorService service = new CloseableExecutorService(executorService);
+        CountDownLatch startLatch = new CountDownLatch(QTY);
+        CountDownLatch latch = new CountDownLatch(QTY);
+        for ( int i = 0; i < QTY; ++i )
         {
-            e.printStackTrace();
+            submitRunnable(service, startLatch, latch);
         }
-        finally
+
+        while ( service.size() < QTY )
         {
-            executorService.shutdownNow();
+            Thread.sleep(100);
         }
+
+        Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
+        service.close();
+        Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
+        Assert.assertEquals(outsideLatch.getCount(), 1);
     }
 
-    private void submitRunnable(FutureContainer service, final CountDownLatch startLatch, final CountDownLatch latch)
+    private void submitRunnable(CloseableExecutorService service, final CountDownLatch startLatch, final CountDownLatch latch)
     {
-        try
-        {
-            service.submit
+        service.submit
             (
                 new Runnable()
                 {
@@ -254,29 +235,18 @@ public class TestCloseableExecutorService
                         try
                         {
                             startLatch.countDown();
-                            count.incrementAndGet();
                             Thread.sleep(100000);
                         }
                         catch ( InterruptedException e )
                         {
                             Thread.currentThread().interrupt();
                         }
-                        catch ( Throwable e )
-                        {
-                            e.printStackTrace();
-                        }
                         finally
                         {
-    //                        count.decrementAndGet();
                             latch.countDown();
                         }
                     }
                 }
             );
-        }
-        catch ( Throwable e )
-        {
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index f42039c..61c3af7 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -263,16 +263,16 @@ public class PathChildrenCache implements Closeable
 
         client.getConnectionStateListenable().addListener(connectionStateListener);
         executorService.submit
-        (
-            new Runnable()
-            {
-                @Override
-                public void run()
+            (
+                new Runnable()
                 {
-                    mainLoop();
+                    @Override
+                    public void run()
+                    {
+                        mainLoop();
+                    }
                 }
-            }
-        );
+            );
 
         switch ( mode )
         {

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
index b0717ed..c8c1510 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.locks;
 import com.google.common.base.Preconditions;
 import com.google.common.io.Closeables;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.CloseableScheduledExecutorService;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.data.Stat;
@@ -29,8 +30,8 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -46,10 +47,10 @@ public class ChildReaper implements Closeable
     private final CuratorFramework client;
     private final String path;
     private final Reaper.Mode mode;
-    private final ScheduledExecutorService executor;
+    private final CloseableScheduledExecutorService executor;
     private final int reapingThresholdMs;
 
-    private volatile ScheduledFuture<?> task;
+    private volatile Future<?> task;
 
     private enum State
     {
@@ -91,7 +92,7 @@ public class ChildReaper implements Closeable
         this.client = client;
         this.path = path;
         this.mode = mode;
-        this.executor = executor;
+        this.executor = new CloseableScheduledExecutorService(executor);
         this.reapingThresholdMs = reapingThresholdMs;
         this.reaper = new Reaper(client, executor, reapingThresholdMs);
     }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
index b540689..037eacd 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -153,7 +154,7 @@ public class Reaper implements Closeable
     public void addPath(String path, Mode mode)
     {
         activePaths.add(path);
-        executor.schedule(new PathHolder(path, mode, 0), reapingThresholdMs, TimeUnit.MILLISECONDS);
+        schedule(new PathHolder(path, mode, 0), reapingThresholdMs);
     }
 
     /**
@@ -186,6 +187,12 @@ public class Reaper implements Closeable
         }
     }
 
+    @VisibleForTesting
+    protected Future<?> schedule(PathHolder pathHolder, int reapingThresholdMs)
+    {
+        return executor.schedule(pathHolder, reapingThresholdMs, TimeUnit.MILLISECONDS);
+    }
+
     private void reap(PathHolder holder)
     {
         if ( !activePaths.contains(holder.path) )
@@ -251,7 +258,7 @@ public class Reaper implements Closeable
         }
         else if ( !Thread.currentThread().isInterrupted() && (state.get() == State.STARTED) && activePaths.contains(holder.path) )
         {
-            executor.schedule(new PathHolder(holder.path, holder.mode, newEmptyCount), reapingThresholdMs, TimeUnit.MILLISECONDS);
+            schedule(new PathHolder(holder.path, holder.mode, newEmptyCount), reapingThresholdMs);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
index 596960d..bd821e4 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
@@ -19,6 +19,7 @@
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.io.Closeables;
+import junit.framework.Assert;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.BaseClassForTests;
@@ -27,13 +28,20 @@ import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.Timing;
-import junit.framework.Assert;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
 import org.testng.annotations.Test;
 import java.io.IOException;
 import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 public class TestReaper extends BaseClassForTests
 {
@@ -106,37 +114,36 @@ public class TestReaper extends BaseClassForTests
 
             final Queue<Reaper.PathHolder>  holders = new ConcurrentLinkedQueue<Reaper.PathHolder>();
             final ExecutorService           pool = Executors.newCachedThreadPool();
-            ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1)
+            ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1);
+
+            reaper = new Reaper
+            (
+                client,
+                service,
+                THRESHOLD
+            )
             {
                 @Override
-                public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+                protected Future<Void> schedule(final PathHolder pathHolder, int reapingThresholdMs)
                 {
-                    final Reaper.PathHolder     pathHolder = (Reaper.PathHolder)command;
                     holders.add(pathHolder);
-                    final ScheduledFuture<?>    f = super.schedule(command, delay, unit);
+                    final Future<?>    f = super.schedule(pathHolder, reapingThresholdMs);
                     pool.submit
-                    (
-                        new Callable<Void>()
-                        {
-                            @Override
-                            public Void call() throws Exception
+                        (
+                            new Callable<Void>()
                             {
-                                f.get();
-                                holders.remove(pathHolder);
-                                return null;
+                                @Override
+                                public Void call() throws Exception
+                                {
+                                    f.get();
+                                    holders.remove(pathHolder);
+                                    return null;
+                                }
                             }
-                        }
-                    );
-                    return f;
+                        );
+                    return null;
                 }
             };
-
-            reaper = new Reaper
-            (
-                client,
-                service,
-                THRESHOLD
-            );
             reaper.start();
             reaper.addPath("/one/two/three");