You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/05/17 22:39:13 UTC
[03/12] git commit: CURATOR-17 PathChildrenCache.close() calls
shutdownNow() on its executor, always. Instead,
it Curator (in general) should only close tasks it has started. This is now
done via wrapped executors in a new class that tracks tasks started
CURATOR-17
PathChildrenCache.close() calls shutdownNow() on its executor, always. Instead, it Curator (in general) should only close tasks it has started. This is now done
via wrapped executors in a new class that tracks tasks started by Curator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/10df9fc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/10df9fc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/10df9fc2
Branch: refs/heads/master
Commit: 10df9fc24a2c36eac9812af1cc4c10c0bab1ae9b
Parents: 86b82ab
Author: randgalt <ra...@apache.org>
Authored: Mon May 6 13:11:49 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Mon May 6 13:11:49 2013 -0700
----------------------------------------------------------------------
.../curator/utils/CloseableExecutorService.java | 111 +++++++++-
.../utils/CloseableExecutorServiceBase.java | 124 -----------
.../utils/CloseableScheduledExecutorService.java | 104 ++++------
.../org/apache/curator/utils/FutureContainer.java | 91 --------
.../utils/TestCloseableExecutorService.java | 160 ++++++---------
.../framework/recipes/cache/PathChildrenCache.java | 16 +-
.../framework/recipes/locks/ChildReaper.java | 9 +-
.../curator/framework/recipes/locks/Reaper.java | 11 +-
.../framework/recipes/locks/TestReaper.java | 55 +++---
9 files changed, 258 insertions(+), 423 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
index cf92ef4..4024d29 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
@@ -1,28 +1,121 @@
package org.apache.curator.utils;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
- * Decorates an {@link ExecutorService} such that submitted tasks
- * are recorded and can be closed en masse.
+ * Decoration on an ExecutorService that tracks created futures and provides
+ * a method to close futures created via this class
*/
-public class CloseableExecutorService extends CloseableExecutorServiceBase
+public class CloseableExecutorService implements Closeable
{
- private final ListeningExecutorService executorService;
+ private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap());
+ private final ExecutorService executorService;
+ protected final AtomicBoolean isOpen = new AtomicBoolean(true);
+
+ protected class InternalFutureTask<T> extends FutureTask<T>
+ {
+ private final RunnableFuture<T> task;
+
+ InternalFutureTask(RunnableFuture<T> task)
+ {
+ super(task, null);
+ this.task = task;
+ futures.add(task);
+ }
+
+ protected void done()
+ {
+ futures.remove(task);
+ }
+ }
/**
* @param executorService the service to decorate
*/
public CloseableExecutorService(ExecutorService executorService)
{
- this.executorService = MoreExecutors.listeningDecorator(executorService);
+ this.executorService = executorService;
+ }
+
+ /**
+ * Returns <tt>true</tt> if this executor has been shut down.
+ *
+ * @return <tt>true</tt> if this executor has been shut down
+ */
+ public boolean isShutdown()
+ {
+ return !isOpen.get();
}
+ @VisibleForTesting
+ int size()
+ {
+ return futures.size();
+ }
+
+ /**
+ * Closes any tasks currently in progress
+ */
@Override
- protected ListeningExecutorService getService()
+ public void close()
{
- return executorService;
+ isOpen.set(false);
+ Iterator<Future<?>> iterator = futures.iterator();
+ while ( iterator.hasNext() )
+ {
+ Future<?> future = iterator.next();
+ iterator.remove();
+ if ( !future.cancel(true) )
+ {
+ System.err.println("Could not cancel");
+ throw new RuntimeException("Could not cancel");
+ }
+ }
+ }
+
+ /**
+ * Submits a value-returning task for execution and returns a Future
+ * representing the pending results of the task. Upon completion,
+ * this task may be taken or polled.
+ *
+ * @param task the task to submit
+ * @return a future to watch the task
+ */
+ public<V> Future<V> submit(Callable<V> task)
+ {
+ Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
+
+ InternalFutureTask<V> futureTask = new InternalFutureTask<V>(new FutureTask<V>(task));
+ executorService.execute(futureTask);
+ return futureTask;
+ }
+
+ /**
+ * Submits a Runnable task for execution and returns a Future
+ * representing that task. Upon completion, this task may be
+ * taken or polled.
+ *
+ * @param task the task to submit
+ * @return a future to watch the task
+ */
+ public Future<?> submit(Runnable task)
+ {
+ Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
+
+ InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null));
+ executorService.execute(futureTask);
+ return futureTask;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
deleted file mode 100644
index 92371d7..0000000
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
+++ /dev/null
@@ -1,124 +0,0 @@
-package org.apache.curator.utils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.io.Closeable;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Decorates an {@link ExecutorService} such that submitted tasks
- * are recorded and can be closed en masse.
- */
-abstract class CloseableExecutorServiceBase implements Closeable
-{
- private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap());
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
- protected abstract ListeningExecutorService getService();
-
- @Override
- public void close()
- {
- isClosed.set(true);
- Iterator<Future<?>> iterator = futures.iterator();
- while ( iterator.hasNext() )
- {
- Future<?> future = iterator.next();
- iterator.remove();
- future.cancel(true);
- }
- }
-
- /**
- * @see ExecutorService#isShutdown()
- * @return true/false
- */
- public boolean isShutdown()
- {
- return getService().isShutdown();
- }
-
- /**
- * @see ExecutorService#isTerminated()
- * @return true/false
- */
- public boolean isTerminated()
- {
- return getService().isTerminated();
- }
-
- /**
- * Calls {@link ExecutorService#submit(Callable)}, records
- * and returns the future
- *
- * @param task task to submit
- * @return the future
- */
- public <T> Future<T> submit(Callable<T> task)
- {
- return record(getService().submit(task));
- }
-
- /**
- * Calls {@link ExecutorService#submit(Runnable)}, records
- * and returns the future
- *
- * @param task task to submit
- * @return the future
- */
- public Future<?> submit(Runnable task)
- {
- return record(getService().submit(task));
- }
-
- @VisibleForTesting
- int size()
- {
- return futures.size();
- }
-
- protected <T> ScheduledFuture<T> record(final ScheduledFuture<T> future)
- {
- if ( isClosed.get() )
- {
- future.cancel(true);
- }
- else
- {
- futures.add(future);
- }
- return future;
- }
-
- protected <T> Future<T> record(final ListenableFuture<T> future)
- {
- Runnable listener = new Runnable()
- {
- @Override
- public void run()
- {
- futures.remove(future);
- }
- };
- if ( isClosed.get() )
- {
- future.cancel(true);
- }
- else
- {
- futures.add(future);
- future.addListener(listener, MoreExecutors.sameThreadExecutor());
- }
- return future;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
index 8638ee6..737ff6b 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
@@ -1,100 +1,72 @@
package org.apache.curator.utils;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
- * Decorates an {@link ExecutorService} such that submitted tasks
- * are recorded and can be closed en masse.
+ * Decoration on an ScheduledExecutorService that tracks created futures and provides
+ * a method to close futures created via this class
*/
-public class CloseableScheduledExecutorService extends CloseableExecutorServiceBase
+public class CloseableScheduledExecutorService extends CloseableExecutorService
{
- private final ListeningScheduledExecutorService executorService;
+ private final ScheduledExecutorService scheduledExecutorService;
/**
- * @param executorService the service to decorate
+ * @param scheduledExecutorService the service to decorate
*/
- public CloseableScheduledExecutorService(ScheduledExecutorService executorService)
+ public CloseableScheduledExecutorService(ScheduledExecutorService scheduledExecutorService)
{
- this.executorService = MoreExecutors.listeningDecorator(executorService);
- }
-
- @Override
- protected ListeningExecutorService getService()
- {
- return executorService;
+ super(scheduledExecutorService);
+ this.scheduledExecutorService = scheduledExecutorService;
}
/**
- * Calls {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)}, records
- * and returns the future
+ * Creates and executes a one-shot action that becomes enabled
+ * after the given delay.
*
- * @param command the task to execute
+ * @param task the task to execute
* @param delay the time from now to delay execution
- * @param unit the time unit of the delay parameter
- * @return a ScheduledFuture representing pending completion of
+ * @param unit the time unit of the delay parameter
+ * @return a Future representing pending completion of
* the task and whose <tt>get()</tt> method will return
* <tt>null</tt> upon completion
*/
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+ public Future<?> schedule(Runnable task, long delay, TimeUnit unit)
{
- return record(executorService.schedule(command, delay, unit));
- }
+ Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
- /**
- * Calls {@link ScheduledExecutorService#schedule(Callable, long, TimeUnit)}, records
- * and returns the future
- *
- * @param callable the task to execute
- * @param delay the time from now to delay execution
- * @param unit the time unit of the delay parameter
- * @return a ScheduledFuture representing pending completion of
- * the task and whose <tt>get()</tt> method will return
- * <tt>null</tt> upon completion
- */
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
- {
- return record(executorService.schedule(callable, delay, unit));
+ InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null));
+ scheduledExecutorService.schedule(futureTask, delay, unit);
+ return futureTask;
}
/**
- * Calls {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}, records
- * and returns the future
+ * Creates and executes a periodic action that becomes enabled first
+ * after the given initial delay, and subsequently with the
+ * given delay between the termination of one execution and the
+ * commencement of the next. If any execution of the task
+ * encounters an exception, subsequent executions are suppressed.
+ * Otherwise, the task will only terminate via cancellation or
+ * termination of the executor.
*
- * @param command the task to execute
+ * @param task the task to execute
* @param initialDelay the time to delay first execution
- * @param period the period between successive executions
- * @param unit the time unit of the initialDelay and period parameters
- * @return a ScheduledFuture representing pending completion of
+ * @param delay the delay between the termination of one
+ * execution and the commencement of the next
+ * @param unit the time unit of the initialDelay and delay parameters
+ * @return a Future representing pending completion of
* the task, and whose <tt>get()</tt> method will throw an
* exception upon cancellation
*/
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+ public Future<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit)
{
- return record(executorService.scheduleAtFixedRate(command, initialDelay, period, unit));
- }
+ Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
- /**
- * Calls {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}, records
- * and returns the future
- *
- * @param command the task to execute
- * @param initialDelay the time to delay first execution
- * @param delay the delay between the termination of one
- * execution and the commencement of the next
- * @param unit the time unit of the initialDelay and delay parameters
- * @return a ScheduledFuture representing pending completion of
- * the task, and whose <tt>get()</tt> method will throw an
- * exception upon cancellation
- */
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
- {
- return record(executorService.scheduleWithFixedDelay(command, initialDelay, delay, unit));
+ InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null));
+ scheduledExecutorService.scheduleWithFixedDelay(futureTask, initialDelay, delay, unit);
+ return futureTask;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java b/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
deleted file mode 100644
index 51fe6a4..0000000
--- a/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.curator.utils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import java.io.Closeable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.RunnableFuture;
-
-public class FutureContainer implements Closeable
-{
- private final List<Future<?>> futures = Lists.newArrayList();
- private final ExecutorService executorService;
-
- private class QueueingFuture<T> extends FutureTask<T>
- {
- private final RunnableFuture<T> task;
-
- QueueingFuture(RunnableFuture<T> task)
- {
- super(task, null);
- this.task = task;
- futures.add(task);
- }
-
- protected void done()
- {
- futures.remove(task);
- }
- }
-
- public FutureContainer(ExecutorService executorService)
- {
- this.executorService = executorService;
- }
-
- @VisibleForTesting
- int size()
- {
- return futures.size();
- }
-
- @Override
- public void close()
- {
- Iterator<Future<?>> iterator = futures.iterator();
- while ( iterator.hasNext() )
- {
- Future<?> future = iterator.next();
- iterator.remove();
- if ( !future.cancel(true) )
- {
- System.err.println("Could not cancel");
- throw new RuntimeException("Could not cancel");
- }
- }
- }
-
- /**
- * Submits a value-returning task for execution and returns a Future
- * representing the pending results of the task. Upon completion,
- * this task may be taken or polled.
- *
- * @param task the task to submit
- */
- public<V> void submit(Callable<V> task)
- {
- FutureTask<V> futureTask = new FutureTask<V>(task);
- executorService.execute(new QueueingFuture<V>(futureTask));
- }
-
- /**
- * Submits a Runnable task for execution and returns a Future
- * representing that task. Upon completion, this task may be
- * taken or polled.
- *
- * @param task the task to submit
- */
- public void submit(Runnable task)
- {
- FutureTask<Void> futureTask = new FutureTask<Void>(task, null);
- executorService.execute(new QueueingFuture<Void>(futureTask));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
index 2cd2901..72b63fd 100644
--- a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
+++ b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
@@ -12,20 +12,17 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
public class TestCloseableExecutorService
{
private static final int QTY = 10;
private volatile ExecutorService executorService;
- private volatile AtomicInteger count;
@BeforeMethod
public void setup()
{
executorService = Executors.newFixedThreadPool(QTY * 2);
- count = new AtomicInteger(0);
}
@AfterMethod
@@ -39,7 +36,7 @@ public class TestCloseableExecutorService
{
try
{
- FutureContainer service = new FutureContainer(executorService);
+ CloseableExecutorService service = new CloseableExecutorService(executorService);
CountDownLatch startLatch = new CountDownLatch(QTY);
CountDownLatch latch = new CountDownLatch(QTY);
for ( int i = 0; i < QTY; ++i )
@@ -47,9 +44,9 @@ public class TestCloseableExecutorService
submitRunnable(service, startLatch, latch);
}
- Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+ Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
service.close();
- Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+ Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
}
catch ( AssertionError e )
{
@@ -65,41 +62,39 @@ public class TestCloseableExecutorService
public void testBasicCallable() throws InterruptedException
{
CloseableExecutorService service = new CloseableExecutorService(executorService);
- List<CountDownLatch> latches = Lists.newArrayList();
+ final CountDownLatch startLatch = new CountDownLatch(QTY);
+ final CountDownLatch latch = new CountDownLatch(QTY);
for ( int i = 0; i < QTY; ++i )
{
- final CountDownLatch latch = new CountDownLatch(1);
- latches.add(latch);
service.submit
- (
- new Callable<Void>()
+ (
+ new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
{
- @Override
- public Void call() throws Exception
+ try
+ {
+ startLatch.countDown();
+ Thread.currentThread().join();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ }
+ finally
{
- try
- {
- Thread.currentThread().join();
- }
- catch ( InterruptedException e )
- {
- Thread.currentThread().interrupt();
- }
- finally
- {
- latch.countDown();
- }
- return null;
+ latch.countDown();
}
+ return null;
}
- );
+ }
+ );
}
+ Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
service.close();
- for ( CountDownLatch latch : latches )
- {
- Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
- }
+ Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
}
@Test
@@ -107,6 +102,7 @@ public class TestCloseableExecutorService
{
CloseableExecutorService service = new CloseableExecutorService(executorService);
List<Future<?>> futures = Lists.newArrayList();
+ final CountDownLatch startLatch = new CountDownLatch(QTY);
for ( int i = 0; i < QTY; ++i )
{
Future<?> future = service.submit
@@ -118,6 +114,7 @@ public class TestCloseableExecutorService
{
try
{
+ startLatch.countDown();
Thread.currentThread().join();
}
catch ( InterruptedException e )
@@ -130,7 +127,7 @@ public class TestCloseableExecutorService
futures.add(future);
}
- Thread.sleep(100);
+ Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
for ( Future<?> future : futures )
{
@@ -144,6 +141,7 @@ public class TestCloseableExecutorService
public void testListeningCallable() throws InterruptedException
{
CloseableExecutorService service = new CloseableExecutorService(executorService);
+ final CountDownLatch startLatch = new CountDownLatch(QTY);
List<Future<?>> futures = Lists.newArrayList();
for ( int i = 0; i < QTY; ++i )
{
@@ -156,6 +154,7 @@ public class TestCloseableExecutorService
{
try
{
+ startLatch.countDown();
Thread.currentThread().join();
}
catch ( InterruptedException e )
@@ -169,8 +168,7 @@ public class TestCloseableExecutorService
futures.add(future);
}
- Thread.sleep(100);
-
+ Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
for ( Future<?> future : futures )
{
future.cancel(true);
@@ -182,69 +180,52 @@ public class TestCloseableExecutorService
@Test
public void testPartialRunnable() throws InterruptedException
{
- try
- {
- final CountDownLatch outsideLatch = new CountDownLatch(1);
- executorService.submit
- (
- new Runnable()
+ final CountDownLatch outsideLatch = new CountDownLatch(1);
+ executorService.submit
+ (
+ new Runnable()
+ {
+ @Override
+ public void run()
{
- @Override
- public void run()
+ try
{
- try
- {
- Thread.currentThread().join();
- }
- catch ( InterruptedException e )
- {
- Thread.currentThread().interrupt();
- }
- finally
- {
- outsideLatch.countDown();
- }
+ Thread.currentThread().join();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ }
+ finally
+ {
+ outsideLatch.countDown();
}
}
- );
-
- FutureContainer service = new FutureContainer(executorService);
- CountDownLatch startLatch = new CountDownLatch(QTY);
- CountDownLatch latch = new CountDownLatch(QTY);
- for ( int i = 0; i < QTY; ++i )
- {
- submitRunnable(service, startLatch, latch);
}
+ );
- while ( service.size() < QTY )
- {
- Thread.sleep(100);
- }
-
- Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
- service.close();
- Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
- Assert.assertEquals(outsideLatch.getCount(), 1);
- }
- catch ( AssertionError e )
- {
- throw e;
- }
- catch ( Throwable e )
+ CloseableExecutorService service = new CloseableExecutorService(executorService);
+ CountDownLatch startLatch = new CountDownLatch(QTY);
+ CountDownLatch latch = new CountDownLatch(QTY);
+ for ( int i = 0; i < QTY; ++i )
{
- e.printStackTrace();
+ submitRunnable(service, startLatch, latch);
}
- finally
+
+ while ( service.size() < QTY )
{
- executorService.shutdownNow();
+ Thread.sleep(100);
}
+
+ Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS));
+ service.close();
+ Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
+ Assert.assertEquals(outsideLatch.getCount(), 1);
}
- private void submitRunnable(FutureContainer service, final CountDownLatch startLatch, final CountDownLatch latch)
+ private void submitRunnable(CloseableExecutorService service, final CountDownLatch startLatch, final CountDownLatch latch)
{
- try
- {
- service.submit
+ service.submit
(
new Runnable()
{
@@ -254,29 +235,18 @@ public class TestCloseableExecutorService
try
{
startLatch.countDown();
- count.incrementAndGet();
Thread.sleep(100000);
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
}
- catch ( Throwable e )
- {
- e.printStackTrace();
- }
finally
{
- // count.decrementAndGet();
latch.countDown();
}
}
}
);
- }
- catch ( Throwable e )
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index f42039c..61c3af7 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -263,16 +263,16 @@ public class PathChildrenCache implements Closeable
client.getConnectionStateListenable().addListener(connectionStateListener);
executorService.submit
- (
- new Runnable()
- {
- @Override
- public void run()
+ (
+ new Runnable()
{
- mainLoop();
+ @Override
+ public void run()
+ {
+ mainLoop();
+ }
}
- }
- );
+ );
switch ( mode )
{
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
index b0717ed..c8c1510 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.locks;
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.CloseableScheduledExecutorService;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.data.Stat;
@@ -29,8 +30,8 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -46,10 +47,10 @@ public class ChildReaper implements Closeable
private final CuratorFramework client;
private final String path;
private final Reaper.Mode mode;
- private final ScheduledExecutorService executor;
+ private final CloseableScheduledExecutorService executor;
private final int reapingThresholdMs;
- private volatile ScheduledFuture<?> task;
+ private volatile Future<?> task;
private enum State
{
@@ -91,7 +92,7 @@ public class ChildReaper implements Closeable
this.client = client;
this.path = path;
this.mode = mode;
- this.executor = executor;
+ this.executor = new CloseableScheduledExecutorService(executor);
this.reapingThresholdMs = reapingThresholdMs;
this.reaper = new Reaper(client, executor, reapingThresholdMs);
}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
index b540689..037eacd 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -153,7 +154,7 @@ public class Reaper implements Closeable
public void addPath(String path, Mode mode)
{
activePaths.add(path);
- executor.schedule(new PathHolder(path, mode, 0), reapingThresholdMs, TimeUnit.MILLISECONDS);
+ schedule(new PathHolder(path, mode, 0), reapingThresholdMs);
}
/**
@@ -186,6 +187,12 @@ public class Reaper implements Closeable
}
}
+ @VisibleForTesting
+ protected Future<?> schedule(PathHolder pathHolder, int reapingThresholdMs)
+ {
+ return executor.schedule(pathHolder, reapingThresholdMs, TimeUnit.MILLISECONDS);
+ }
+
private void reap(PathHolder holder)
{
if ( !activePaths.contains(holder.path) )
@@ -251,7 +258,7 @@ public class Reaper implements Closeable
}
else if ( !Thread.currentThread().isInterrupted() && (state.get() == State.STARTED) && activePaths.contains(holder.path) )
{
- executor.schedule(new PathHolder(holder.path, holder.mode, newEmptyCount), reapingThresholdMs, TimeUnit.MILLISECONDS);
+ schedule(new PathHolder(holder.path, holder.mode, newEmptyCount), reapingThresholdMs);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
index 596960d..bd821e4 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.recipes.locks;
import com.google.common.io.Closeables;
+import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.BaseClassForTests;
@@ -27,13 +28,20 @@ import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.Timing;
-import junit.framework.Assert;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
public class TestReaper extends BaseClassForTests
{
@@ -106,37 +114,36 @@ public class TestReaper extends BaseClassForTests
final Queue<Reaper.PathHolder> holders = new ConcurrentLinkedQueue<Reaper.PathHolder>();
final ExecutorService pool = Executors.newCachedThreadPool();
- ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1)
+ ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1);
+
+ reaper = new Reaper
+ (
+ client,
+ service,
+ THRESHOLD
+ )
{
@Override
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+ protected Future<Void> schedule(final PathHolder pathHolder, int reapingThresholdMs)
{
- final Reaper.PathHolder pathHolder = (Reaper.PathHolder)command;
holders.add(pathHolder);
- final ScheduledFuture<?> f = super.schedule(command, delay, unit);
+ final Future<?> f = super.schedule(pathHolder, reapingThresholdMs);
pool.submit
- (
- new Callable<Void>()
- {
- @Override
- public Void call() throws Exception
+ (
+ new Callable<Void>()
{
- f.get();
- holders.remove(pathHolder);
- return null;
+ @Override
+ public Void call() throws Exception
+ {
+ f.get();
+ holders.remove(pathHolder);
+ return null;
+ }
}
- }
- );
- return f;
+ );
+ return null;
}
};
-
- reaper = new Reaper
- (
- client,
- service,
- THRESHOLD
- );
reaper.start();
reaper.addPath("/one/two/three");