You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/02/25 17:11:40 UTC
git commit: CloseableScheduledExecutorService.scheduleWithFixedDelay
was incorrectly using a FutureTask. FutureTasks are one time use only. This
caused CloseableScheduledExecutorService.scheduleWithFixedDelay to be non
functional after the first iteratio
Repository: curator
Updated Branches:
refs/heads/CURATOR-89 [created] 7f7df9ce8
CloseableScheduledExecutorService.scheduleWithFixedDelay was incorrectly using a FutureTask. FutureTasks are one time use only.
This caused CloseableScheduledExecutorService.scheduleWithFixedDelay to be non functional after the first iteration.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7f7df9ce
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7f7df9ce
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7f7df9ce
Branch: refs/heads/CURATOR-89
Commit: 7f7df9ce8e1e4b69e169ba9e5d1236c731b3dfd0
Parents: a268727
Author: randgalt <ra...@apache.org>
Authored: Tue Feb 25 21:40:41 2014 +0530
Committer: randgalt <ra...@apache.org>
Committed: Tue Feb 25 21:40:41 2014 +0530
----------------------------------------------------------------------
.../curator/utils/CloseableExecutorService.java | 50 +++++++--
.../CloseableScheduledExecutorService.java | 10 +-
.../TestCloseableScheduledExecutorService.java | 102 +++++++++++++++++++
3 files changed, 151 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/7f7df9ce/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
index 763f90f..5e51449 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
@@ -28,11 +28,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -47,6 +43,48 @@ public class CloseableExecutorService implements Closeable
private final boolean shutdownOnClose;
protected final AtomicBoolean isOpen = new AtomicBoolean(true);
+ protected class InternalScheduledFutureTask implements Future<Void>
+ {
+ private final ScheduledFuture<?> scheduledFuture;
+
+ public InternalScheduledFutureTask(ScheduledFuture<?> scheduledFuture)
+ {
+ this.scheduledFuture = scheduledFuture;
+ futures.add(scheduledFuture);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ futures.remove(scheduledFuture);
+ return scheduledFuture.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isCancelled()
+ {
+ return scheduledFuture.isCancelled();
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return scheduledFuture.isDone();
+ }
+
+ @Override
+ public Void get() throws InterruptedException, ExecutionException
+ {
+ return null;
+ }
+
+ @Override
+ public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ return null;
+ }
+ }
+
protected class InternalFutureTask<T> extends FutureTask<T>
{
private final RunnableFuture<T> task;
@@ -74,7 +112,7 @@ public class CloseableExecutorService implements Closeable
/**
* @param executorService the service to decorate
- * @param shutdownOnClose
+ * @param shutdownOnClose if true, shutdown the executor service when this is closed
*/
public CloseableExecutorService(ExecutorService executorService, boolean shutdownOnClose)
{
http://git-wip-us.apache.org/repos/asf/curator/blob/7f7df9ce/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
index 417272c..989293c 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@@ -43,8 +44,8 @@ public class CloseableScheduledExecutorService extends CloseableExecutorService
}
/**
- * @param scheduledExecutorService
- * @param shutdownOnClose
+ * @param scheduledExecutorService the service to decorate
+ * @param shutdownOnClose if true, shutdown the executor service when this is closed
*/
public CloseableScheduledExecutorService(ScheduledExecutorService scheduledExecutorService, boolean shutdownOnClose)
{
@@ -94,8 +95,7 @@ public class CloseableScheduledExecutorService extends CloseableExecutorService
{
Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
- InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null));
- scheduledExecutorService.scheduleWithFixedDelay(futureTask, initialDelay, delay, unit);
- return futureTask;
+ ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(task, initialDelay, delay, unit);
+ return new InternalScheduledFutureTask(scheduledFuture);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/7f7df9ce/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
new file mode 100644
index 0000000..111a96b
--- /dev/null
+++ b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
@@ -0,0 +1,102 @@
+package org.apache.curator.utils;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestCloseableScheduledExecutorService
+{
+ private static final int QTY = 10;
+ private static final int DELAY_MS = 100;
+
+ private volatile ScheduledExecutorService executorService;
+
+ @BeforeMethod
+ public void setup()
+ {
+ executorService = Executors.newScheduledThreadPool(QTY * 2);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ {
+ executorService.shutdownNow();
+ }
+
+ @Test
+ public void testCloseableScheduleWithFixedDelay() throws InterruptedException
+ {
+ CloseableScheduledExecutorService service = new CloseableScheduledExecutorService(executorService);
+
+ final CountDownLatch latch = new CountDownLatch(QTY);
+ service.scheduleWithFixedDelay(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ latch.countDown();
+ }
+ },
+ DELAY_MS,
+ DELAY_MS,
+ TimeUnit.MILLISECONDS
+ );
+
+ Assert.assertTrue(latch.await((QTY * 2) * DELAY_MS, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testCloseableScheduleWithFixedDelayAndAdditionalTasks() throws InterruptedException
+ {
+ final AtomicInteger outerCounter = new AtomicInteger(0);
+ Runnable command = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ outerCounter.incrementAndGet();
+ }
+ };
+ executorService.scheduleWithFixedDelay(command, DELAY_MS, DELAY_MS, TimeUnit.MILLISECONDS);
+
+ CloseableScheduledExecutorService service = new CloseableScheduledExecutorService(executorService);
+
+ final AtomicInteger innerCounter = new AtomicInteger(0);
+ service.scheduleWithFixedDelay(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ innerCounter.incrementAndGet();
+ }
+ }, DELAY_MS, DELAY_MS, TimeUnit.MILLISECONDS);
+
+ Thread.sleep(DELAY_MS * 4);
+
+ service.close();
+ Thread.sleep(DELAY_MS * 2);
+
+ int innerValue = innerCounter.get();
+ Assert.assertTrue(innerValue > 0);
+
+ int value = outerCounter.get();
+ Thread.sleep(DELAY_MS * 2);
+ int newValue = outerCounter.get();
+ Assert.assertTrue(newValue > value);
+ Assert.assertEquals(innerValue, innerCounter.get());
+
+ value = newValue;
+ Thread.sleep(DELAY_MS * 2);
+ newValue = outerCounter.get();
+ Assert.assertTrue(newValue > value);
+ Assert.assertEquals(innerValue, innerCounter.get());
+ }
+}