You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/02/25 17:11:40 UTC

git commit: CloseableScheduledExecutorService.scheduleWithFixedDelay was incorrectly using a FutureTask. FutureTasks are one time use only. This caused CloseableScheduledExecutorService.scheduleWithFixedDelay to be non functional after the first iteratio

Repository: curator
Updated Branches:
  refs/heads/CURATOR-89 [created] 7f7df9ce8


CloseableScheduledExecutorService.scheduleWithFixedDelay was incorrectly using a FutureTask. FutureTasks are one time use only.
This caused CloseableScheduledExecutorService.scheduleWithFixedDelay to be non functional after the first iteration.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7f7df9ce
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7f7df9ce
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7f7df9ce

Branch: refs/heads/CURATOR-89
Commit: 7f7df9ce8e1e4b69e169ba9e5d1236c731b3dfd0
Parents: a268727
Author: randgalt <ra...@apache.org>
Authored: Tue Feb 25 21:40:41 2014 +0530
Committer: randgalt <ra...@apache.org>
Committed: Tue Feb 25 21:40:41 2014 +0530

----------------------------------------------------------------------
 .../curator/utils/CloseableExecutorService.java |  50 +++++++--
 .../CloseableScheduledExecutorService.java      |  10 +-
 .../TestCloseableScheduledExecutorService.java  | 102 +++++++++++++++++++
 3 files changed, 151 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/7f7df9ce/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
index 763f90f..5e51449 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java
@@ -28,11 +28,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.util.Iterator;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -47,6 +43,48 @@ public class CloseableExecutorService implements Closeable
     private final boolean shutdownOnClose;
     protected final AtomicBoolean isOpen = new AtomicBoolean(true);
 
+    protected class InternalScheduledFutureTask implements Future<Void>
+    {
+        private final ScheduledFuture<?> scheduledFuture;
+
+        public InternalScheduledFutureTask(ScheduledFuture<?> scheduledFuture)
+        {
+            this.scheduledFuture = scheduledFuture;
+            futures.add(scheduledFuture);
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning)
+        {
+            futures.remove(scheduledFuture);
+            return scheduledFuture.cancel(mayInterruptIfRunning);
+        }
+
+        @Override
+        public boolean isCancelled()
+        {
+            return scheduledFuture.isCancelled();
+        }
+
+        @Override
+        public boolean isDone()
+        {
+            return scheduledFuture.isDone();
+        }
+
+        @Override
+        public Void get() throws InterruptedException, ExecutionException
+        {
+            return null;
+        }
+
+        @Override
+        public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+        {
+            return null;
+        }
+    }
+
     protected class InternalFutureTask<T> extends FutureTask<T>
     {
         private final RunnableFuture<T> task;
@@ -74,7 +112,7 @@ public class CloseableExecutorService implements Closeable
 
     /**
      * @param executorService the service to decorate
-     * @param shutdownOnClose
+     * @param shutdownOnClose if true, shutdown the executor service when this is closed
      */
     public CloseableExecutorService(ExecutorService executorService, boolean shutdownOnClose)
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/7f7df9ce/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
index 417272c..989293c 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -43,8 +44,8 @@ public class CloseableScheduledExecutorService extends CloseableExecutorService
     }
 
     /**
-     * @param scheduledExecutorService
-     * @param shutdownOnClose
+     * @param scheduledExecutorService the service to decorate
+     * @param shutdownOnClose if true, shutdown the executor service when this is closed
      */
     public CloseableScheduledExecutorService(ScheduledExecutorService scheduledExecutorService, boolean shutdownOnClose)
     {
@@ -94,8 +95,7 @@ public class CloseableScheduledExecutorService extends CloseableExecutorService
     {
         Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed");
 
-        InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null));
-        scheduledExecutorService.scheduleWithFixedDelay(futureTask, initialDelay, delay, unit);
-        return futureTask;
+        ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(task, initialDelay, delay, unit);
+        return new InternalScheduledFutureTask(scheduledFuture);
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/7f7df9ce/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
new file mode 100644
index 0000000..111a96b
--- /dev/null
+++ b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableScheduledExecutorService.java
@@ -0,0 +1,102 @@
+package org.apache.curator.utils;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestCloseableScheduledExecutorService
+{
+    private static final int QTY = 10;
+    private static final int DELAY_MS = 100;
+
+    private volatile ScheduledExecutorService executorService;
+
+    @BeforeMethod
+    public void setup()
+    {
+        executorService = Executors.newScheduledThreadPool(QTY * 2);
+    }
+
+    @AfterMethod
+    public void tearDown()
+    {
+        executorService.shutdownNow();
+    }
+
+    @Test
+    public void testCloseableScheduleWithFixedDelay() throws InterruptedException
+    {
+        CloseableScheduledExecutorService service = new CloseableScheduledExecutorService(executorService);
+
+        final CountDownLatch latch = new CountDownLatch(QTY);
+        service.scheduleWithFixedDelay(
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        latch.countDown();
+                    }
+                },
+                DELAY_MS,
+                DELAY_MS,
+                TimeUnit.MILLISECONDS
+        );
+
+        Assert.assertTrue(latch.await((QTY * 2) * DELAY_MS, TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testCloseableScheduleWithFixedDelayAndAdditionalTasks() throws InterruptedException
+    {
+        final AtomicInteger outerCounter = new AtomicInteger(0);
+        Runnable command = new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                outerCounter.incrementAndGet();
+            }
+        };
+        executorService.scheduleWithFixedDelay(command, DELAY_MS, DELAY_MS, TimeUnit.MILLISECONDS);
+
+        CloseableScheduledExecutorService service = new CloseableScheduledExecutorService(executorService);
+
+        final AtomicInteger innerCounter = new AtomicInteger(0);
+        service.scheduleWithFixedDelay(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                innerCounter.incrementAndGet();
+            }
+        }, DELAY_MS, DELAY_MS, TimeUnit.MILLISECONDS);
+
+        Thread.sleep(DELAY_MS * 4);
+
+        service.close();
+        Thread.sleep(DELAY_MS * 2);
+
+        int innerValue = innerCounter.get();
+        Assert.assertTrue(innerValue > 0);
+
+        int value = outerCounter.get();
+        Thread.sleep(DELAY_MS * 2);
+        int newValue = outerCounter.get();
+        Assert.assertTrue(newValue > value);
+        Assert.assertEquals(innerValue, innerCounter.get());
+
+        value = newValue;
+        Thread.sleep(DELAY_MS * 2);
+        newValue = outerCounter.get();
+        Assert.assertTrue(newValue > value);
+        Assert.assertEquals(innerValue, innerCounter.get());
+    }
+}