You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/05/17 22:39:12 UTC
[02/12] git commit: temp checkin
temp checkin
Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/86b82ab6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/86b82ab6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/86b82ab6
Branch: refs/heads/master
Commit: 86b82ab61d951429121e2f869bc00ff4cede9407
Parents: 97cda39
Author: randgalt <ra...@apache.org>
Authored: Mon May 6 11:56:19 2013 -0700
Committer: randgalt <ra...@apache.org>
Committed: Mon May 6 11:56:19 2013 -0700
----------------------------------------------------------------------
.../curator/utils/CloseableExecutorService.java | 28 ++
.../utils/CloseableExecutorServiceBase.java | 124 +++++++
.../utils/CloseableScheduledExecutorService.java | 100 +++++
.../org/apache/curator/utils/FutureContainer.java | 91 +++++
.../utils/TestCloseableExecutorService.java | 282 +++++++++++++++
.../framework/recipes/cache/PathChildrenCache.java | 25 +-
.../curator/framework/recipes/locks/Reaper.java | 14 +-
7 files changed, 642 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
new file mode 100644
index 0000000..cf92ef4
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
@@ -0,0 +1,28 @@
+package org.apache.curator.utils;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Decorates an {@link ExecutorService} such that submitted tasks
+ * are recorded and can be closed en masse.
+ */
+public class CloseableExecutorService extends CloseableExecutorServiceBase
+{
+ private final ListeningExecutorService executorService;
+
+ /**
+ * @param executorService the service to decorate
+ */
+ public CloseableExecutorService(ExecutorService executorService)
+ {
+ this.executorService = MoreExecutors.listeningDecorator(executorService);
+ }
+
+ @Override
+ protected ListeningExecutorService getService()
+ {
+ return executorService;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
new file mode 100644
index 0000000..92371d7
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java
@@ -0,0 +1,124 @@
+package org.apache.curator.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Decorates an {@link ExecutorService} such that submitted tasks
+ * are recorded and can be closed en masse.
+ */
+abstract class CloseableExecutorServiceBase implements Closeable
+{
+ private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap());
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+ protected abstract ListeningExecutorService getService();
+
+ @Override
+ public void close()
+ {
+ isClosed.set(true);
+ Iterator<Future<?>> iterator = futures.iterator();
+ while ( iterator.hasNext() )
+ {
+ Future<?> future = iterator.next();
+ iterator.remove();
+ future.cancel(true);
+ }
+ }
+
+ /**
+ * @see ExecutorService#isShutdown()
+ * @return true/false
+ */
+ public boolean isShutdown()
+ {
+ return getService().isShutdown();
+ }
+
+ /**
+ * @see ExecutorService#isTerminated()
+ * @return true/false
+ */
+ public boolean isTerminated()
+ {
+ return getService().isTerminated();
+ }
+
+ /**
+ * Calls {@link ExecutorService#submit(Callable)}, records
+ * and returns the future
+ *
+ * @param task task to submit
+ * @return the future
+ */
+ public <T> Future<T> submit(Callable<T> task)
+ {
+ return record(getService().submit(task));
+ }
+
+ /**
+ * Calls {@link ExecutorService#submit(Runnable)}, records
+ * and returns the future
+ *
+ * @param task task to submit
+ * @return the future
+ */
+ public Future<?> submit(Runnable task)
+ {
+ return record(getService().submit(task));
+ }
+
+ @VisibleForTesting
+ int size()
+ {
+ return futures.size();
+ }
+
+ protected <T> ScheduledFuture<T> record(final ScheduledFuture<T> future)
+ {
+ if ( isClosed.get() )
+ {
+ future.cancel(true);
+ }
+ else
+ {
+ futures.add(future);
+ }
+ return future;
+ }
+
+ protected <T> Future<T> record(final ListenableFuture<T> future)
+ {
+ Runnable listener = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ futures.remove(future);
+ }
+ };
+ if ( isClosed.get() )
+ {
+ future.cancel(true);
+ }
+ else
+ {
+ futures.add(future);
+ future.addListener(listener, MoreExecutors.sameThreadExecutor());
+ }
+ return future;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
new file mode 100644
index 0000000..8638ee6
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
@@ -0,0 +1,100 @@
+package org.apache.curator.utils;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Decorates an {@link ExecutorService} such that submitted tasks
+ * are recorded and can be closed en masse.
+ */
+public class CloseableScheduledExecutorService extends CloseableExecutorServiceBase
+{
+ private final ListeningScheduledExecutorService executorService;
+
+ /**
+ * @param executorService the service to decorate
+ */
+ public CloseableScheduledExecutorService(ScheduledExecutorService executorService)
+ {
+ this.executorService = MoreExecutors.listeningDecorator(executorService);
+ }
+
+ @Override
+ protected ListeningExecutorService getService()
+ {
+ return executorService;
+ }
+
+ /**
+ * Calls {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)}, records
+ * and returns the future
+ *
+ * @param command the task to execute
+ * @param delay the time from now to delay execution
+ * @param unit the time unit of the delay parameter
+ * @return a ScheduledFuture representing pending completion of
+ * the task and whose <tt>get()</tt> method will return
+ * <tt>null</tt> upon completion
+ */
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+ {
+ return record(executorService.schedule(command, delay, unit));
+ }
+
+ /**
+ * Calls {@link ScheduledExecutorService#schedule(Callable, long, TimeUnit)}, records
+ * and returns the future
+ *
+ * @param callable the task to execute
+ * @param delay the time from now to delay execution
+ * @param unit the time unit of the delay parameter
+ * @return a ScheduledFuture representing pending completion of
+ * the task and whose <tt>get()</tt> method will return
+ * <tt>null</tt> upon completion
+ */
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
+ {
+ return record(executorService.schedule(callable, delay, unit));
+ }
+
+ /**
+ * Calls {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}, records
+ * and returns the future
+ *
+ * @param command the task to execute
+ * @param initialDelay the time to delay first execution
+ * @param period the period between successive executions
+ * @param unit the time unit of the initialDelay and period parameters
+ * @return a ScheduledFuture representing pending completion of
+ * the task, and whose <tt>get()</tt> method will throw an
+ * exception upon cancellation
+ */
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+ {
+ return record(executorService.scheduleAtFixedRate(command, initialDelay, period, unit));
+ }
+
+ /**
+ * Calls {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}, records
+ * and returns the future
+ *
+ * @param command the task to execute
+ * @param initialDelay the time to delay first execution
+ * @param delay the delay between the termination of one
+ * execution and the commencement of the next
+ * @param unit the time unit of the initialDelay and delay parameters
+ * @return a ScheduledFuture representing pending completion of
+ * the task, and whose <tt>get()</tt> method will throw an
+ * exception upon cancellation
+ */
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+ {
+ return record(executorService.scheduleWithFixedDelay(command, initialDelay, delay, unit));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java b/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
new file mode 100644
index 0000000..51fe6a4
--- /dev/null
+++ b/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java
@@ -0,0 +1,91 @@
+package org.apache.curator.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+
+public class FutureContainer implements Closeable
+{
+ private final List<Future<?>> futures = Lists.newArrayList();
+ private final ExecutorService executorService;
+
+ private class QueueingFuture<T> extends FutureTask<T>
+ {
+ private final RunnableFuture<T> task;
+
+ QueueingFuture(RunnableFuture<T> task)
+ {
+ super(task, null);
+ this.task = task;
+ futures.add(task);
+ }
+
+ protected void done()
+ {
+ futures.remove(task);
+ }
+ }
+
+ public FutureContainer(ExecutorService executorService)
+ {
+ this.executorService = executorService;
+ }
+
+ @VisibleForTesting
+ int size()
+ {
+ return futures.size();
+ }
+
+ @Override
+ public void close()
+ {
+ Iterator<Future<?>> iterator = futures.iterator();
+ while ( iterator.hasNext() )
+ {
+ Future<?> future = iterator.next();
+ iterator.remove();
+ if ( !future.cancel(true) )
+ {
+ System.err.println("Could not cancel");
+ throw new RuntimeException("Could not cancel");
+ }
+ }
+ }
+
+ /**
+ * Submits a value-returning task for execution and returns a Future
+ * representing the pending results of the task. Upon completion,
+ * this task may be taken or polled.
+ *
+ * @param task the task to submit
+ */
+ public<V> void submit(Callable<V> task)
+ {
+ FutureTask<V> futureTask = new FutureTask<V>(task);
+ executorService.execute(new QueueingFuture<V>(futureTask));
+ }
+
+ /**
+ * Submits a Runnable task for execution and returns a Future
+ * representing that task. Upon completion, this task may be
+ * taken or polled.
+ *
+ * @param task the task to submit
+ */
+ public void submit(Runnable task)
+ {
+ FutureTask<Void> futureTask = new FutureTask<Void>(task, null);
+ executorService.execute(new QueueingFuture<Void>(futureTask));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
new file mode 100644
index 0000000..2cd2901
--- /dev/null
+++ b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java
@@ -0,0 +1,282 @@
+package org.apache.curator.utils;
+
+import com.google.common.collect.Lists;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestCloseableExecutorService
+{
+ private static final int QTY = 10;
+
+ private volatile ExecutorService executorService;
+ private volatile AtomicInteger count;
+
+ @BeforeMethod
+ public void setup()
+ {
+ executorService = Executors.newFixedThreadPool(QTY * 2);
+ count = new AtomicInteger(0);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ {
+ executorService.shutdownNow();
+ }
+
+ @Test
+ public void testBasicRunnable() throws InterruptedException
+ {
+ try
+ {
+ FutureContainer service = new FutureContainer(executorService);
+ CountDownLatch startLatch = new CountDownLatch(QTY);
+ CountDownLatch latch = new CountDownLatch(QTY);
+ for ( int i = 0; i < QTY; ++i )
+ {
+ submitRunnable(service, startLatch, latch);
+ }
+
+ Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+ service.close();
+ Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+ }
+ catch ( AssertionError e )
+ {
+ throw e;
+ }
+ catch ( Throwable e )
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testBasicCallable() throws InterruptedException
+ {
+ CloseableExecutorService service = new CloseableExecutorService(executorService);
+ List<CountDownLatch> latches = Lists.newArrayList();
+ for ( int i = 0; i < QTY; ++i )
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ latches.add(latch);
+ service.submit
+ (
+ new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ Thread.currentThread().join();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ return null;
+ }
+ }
+ );
+ }
+
+ service.close();
+ for ( CountDownLatch latch : latches )
+ {
+ Assert.assertTrue(latch.await(3, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ public void testListeningRunnable() throws InterruptedException
+ {
+ CloseableExecutorService service = new CloseableExecutorService(executorService);
+ List<Future<?>> futures = Lists.newArrayList();
+ for ( int i = 0; i < QTY; ++i )
+ {
+ Future<?> future = service.submit
+ (
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ Thread.currentThread().join();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ );
+ futures.add(future);
+ }
+
+ Thread.sleep(100);
+
+ for ( Future<?> future : futures )
+ {
+ future.cancel(true);
+ }
+
+ Assert.assertEquals(service.size(), 0);
+ }
+
+ @Test
+ public void testListeningCallable() throws InterruptedException
+ {
+ CloseableExecutorService service = new CloseableExecutorService(executorService);
+ List<Future<?>> futures = Lists.newArrayList();
+ for ( int i = 0; i < QTY; ++i )
+ {
+ Future<?> future = service.submit
+ (
+ new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ Thread.currentThread().join();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ }
+ return null;
+ }
+ }
+ );
+ futures.add(future);
+ }
+
+ Thread.sleep(100);
+
+ for ( Future<?> future : futures )
+ {
+ future.cancel(true);
+ }
+
+ Assert.assertEquals(service.size(), 0);
+ }
+
+ @Test
+ public void testPartialRunnable() throws InterruptedException
+ {
+ try
+ {
+ final CountDownLatch outsideLatch = new CountDownLatch(1);
+ executorService.submit
+ (
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ Thread.currentThread().join();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ }
+ finally
+ {
+ outsideLatch.countDown();
+ }
+ }
+ }
+ );
+
+ FutureContainer service = new FutureContainer(executorService);
+ CountDownLatch startLatch = new CountDownLatch(QTY);
+ CountDownLatch latch = new CountDownLatch(QTY);
+ for ( int i = 0; i < QTY; ++i )
+ {
+ submitRunnable(service, startLatch, latch);
+ }
+
+ while ( service.size() < QTY )
+ {
+ Thread.sleep(100);
+ }
+
+ Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+ service.close();
+ Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size());
+ Assert.assertEquals(outsideLatch.getCount(), 1);
+ }
+ catch ( AssertionError e )
+ {
+ throw e;
+ }
+ catch ( Throwable e )
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ executorService.shutdownNow();
+ }
+ }
+
+ private void submitRunnable(FutureContainer service, final CountDownLatch startLatch, final CountDownLatch latch)
+ {
+ try
+ {
+ service.submit
+ (
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ startLatch.countDown();
+ count.incrementAndGet();
+ Thread.sleep(100000);
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch ( Throwable e )
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ // count.decrementAndGet();
+ latch.countDown();
+ }
+ }
+ }
+ );
+ }
+ catch ( Throwable e )
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 9b25001..f42039c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -32,6 +32,7 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
@@ -70,7 +71,7 @@ public class PathChildrenCache implements Closeable
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;
private final String path;
- private final ExecutorService executorService;
+ private final CloseableExecutorService executorService;
private final boolean cacheData;
private final boolean dataIsCompressed;
private final EnsurePath ensurePath;
@@ -197,7 +198,7 @@ public class PathChildrenCache implements Closeable
this.path = path;
this.cacheData = cacheData;
this.dataIsCompressed = dataIsCompressed;
- this.executorService = executorService;
+ this.executorService = new CloseableExecutorService(executorService);
ensurePath = client.newNamespaceAwareEnsurePath(path);
}
@@ -261,17 +262,17 @@ public class PathChildrenCache implements Closeable
mode = Preconditions.checkNotNull(mode, "mode cannot be null");
client.getConnectionStateListenable().addListener(connectionStateListener);
- executorService.execute
- (
- new Runnable()
+ executorService.submit
+ (
+ new Runnable()
+ {
+ @Override
+ public void run()
{
- @Override
- public void run()
- {
- mainLoop();
- }
+ mainLoop();
}
- );
+ }
+ );
switch ( mode )
{
@@ -357,7 +358,7 @@ public class PathChildrenCache implements Closeable
Preconditions.checkState(!executorService.isShutdown(), "has not been started");
client.getConnectionStateListenable().removeListener(connectionStateListener);
- executorService.shutdownNow();
+ executorService.close();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/86b82ab6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
index 11efefd..b540689 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.CloseableScheduledExecutorService;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@@ -42,7 +43,7 @@ public class Reaper implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;
- private final ScheduledExecutorService executor;
+ private final CloseableScheduledExecutorService executor;
private final int reapingThresholdMs;
private final Set<String> activePaths = Sets.newSetFromMap(Maps.<String, Boolean>newConcurrentMap());
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
@@ -127,7 +128,7 @@ public class Reaper implements Closeable
public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs)
{
this.client = client;
- this.executor = executor;
+ this.executor = new CloseableScheduledExecutorService(executor);
this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD;
}
@@ -181,14 +182,7 @@ public class Reaper implements Closeable
{
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
- try
- {
- executor.shutdownNow();
- }
- catch ( Exception e )
- {
- log.error("Canceling task", e);
- }
+ executor.close();
}
}