You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/05 22:17:03 UTC
[13/17] flink git commit: [FLINK-4750] [runtime] Cleanly await end of
all currently executing processing time timers when finite streams finish.
[FLINK-4750] [runtime] Cleanly await end of all currently executing processing time timers when finite streams finish.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8aea8c8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8aea8c8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8aea8c8f
Branch: refs/heads/master
Commit: 8aea8c8f427f5511c6064abbc4b85a3ef106743a
Parents: 1cd8d4f
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Oct 5 14:33:01 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Oct 5 20:04:34 2016 +0200
----------------------------------------------------------------------
.../tasks/DefaultTimeServiceProvider.java | 151 +++++++++++++++-
.../streaming/runtime/tasks/StreamTask.java | 3 +
.../runtime/tasks/TestTimeServiceProvider.java | 55 +++---
.../runtime/tasks/TimeServiceProvider.java | 60 +++++--
.../operators/windowing/NoOpTimerService.java | 7 +-
.../tasks/DefaultTimeServiceProviderTest.java | 179 +++++++++++++++++++
6 files changed, 414 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index 5664eac..d2c743f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -17,12 +17,20 @@
package org.apache.flink.streaming.runtime.tasks;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.runtime.operators.Triggerable;
+import javax.annotation.Nonnull;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -32,6 +40,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class DefaultTimeServiceProvider extends TimeServiceProvider {
+ private static final int STATUS_ALIVE = 0;
+ private static final int STATUS_QUIESCED = 1;
+ private static final int STATUS_SHUTDOWN = 2;
+
+ // ------------------------------------------------------------------------
+
/** The containing task that owns this time service provider. */
private final AsyncExceptionHandler task;
@@ -41,6 +55,8 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
/** The executor service that schedules and calls the triggers of this task*/
private final ScheduledThreadPoolExecutor timerService;
+ private final AtomicInteger status;
+
public DefaultTimeServiceProvider(AsyncExceptionHandler failureHandler, Object checkpointLock) {
this(failureHandler, checkpointLock, null);
@@ -50,19 +66,24 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
AsyncExceptionHandler task,
Object checkpointLock,
ThreadFactory threadFactory) {
-
+
this.task = checkNotNull(task);
this.checkpointLock = checkNotNull(checkpointLock);
+ this.status = new AtomicInteger(STATUS_ALIVE);
+
if (threadFactory == null) {
this.timerService = new ScheduledThreadPoolExecutor(1);
} else {
this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
}
- // allow trigger tasks to be removed if all timers for
- // that timestamp are removed by user
+ // tasks should be removed if the future is canceled
this.timerService.setRemoveOnCancelPolicy(true);
+
+ // make sure shutdown removes all pending tasks
+ this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
}
@Override
@@ -73,17 +94,50 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
@Override
public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
long delay = Math.max(timestamp - getCurrentProcessingTime(), 0);
- return timerService.schedule(new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
+
+ // we directly try to register the timer and only react to the status on exception
+ // that way we save unnecessary volatile accesses for each timer
+ try {
+ return timerService.schedule(
+ new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
+ }
+ catch (RejectedExecutionException e) {
+ final int status = this.status.get();
+ if (status == STATUS_QUIESCED) {
+ return new NeverCompleteFuture(delay);
+ }
+ else if (status == STATUS_SHUTDOWN) {
+ throw new IllegalStateException("Timer service is shut down");
+ }
+ else {
+ // something else happened, so propagate the exception
+ throw e;
+ }
+ }
}
@Override
public boolean isTerminated() {
- return timerService.isTerminated();
+ return status.get() == STATUS_SHUTDOWN;
}
@Override
- public void shutdownService() throws Exception {
- timerService.shutdownNow();
+ public void quiesceAndAwaitPending() throws InterruptedException {
+ if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) {
+ timerService.shutdown();
+
+ // await forever (almost)
+ timerService.awaitTermination(365, TimeUnit.DAYS);
+ }
+ }
+
+ @Override
+ public void shutdownService() {
+ if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) ||
+ status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN))
+ {
+ timerService.shutdownNow();
+ }
}
// safety net to destroy the thread pool
@@ -93,6 +147,18 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
timerService.shutdownNow();
}
+ @VisibleForTesting
+ int getNumTasksScheduled() {
+ BlockingQueue<?> queue = timerService.getQueue();
+ if (queue == null) {
+ return 0;
+ } else {
+ return queue.size();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
/**
* Internal task that is invoked by the timer service and triggers the target.
*/
@@ -122,4 +188,75 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
}
}
}
+
+ // ------------------------------------------------------------------------
+
+ private static final class NeverCompleteFuture implements ScheduledFuture<Object> {
+
+ private final Object lock = new Object();
+
+ private final long delayMillis;
+
+ private volatile boolean canceled;
+
+
+ private NeverCompleteFuture(long delayMillis) {
+ this.delayMillis = delayMillis;
+ }
+
+ @Override
+ public long getDelay(@Nonnull TimeUnit unit) {
+ return unit.convert(delayMillis, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int compareTo(@Nonnull Delayed o) {
+ long otherMillis = o.getDelay(TimeUnit.MILLISECONDS);
+ return Long.compare(this.delayMillis, otherMillis);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ synchronized (lock) {
+ canceled = true;
+ lock.notifyAll();
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return canceled;
+ }
+
+ @Override
+ public boolean isDone() {
+ return false;
+ }
+
+ @Override
+ public Object get() throws InterruptedException {
+ synchronized (lock) {
+ while (!canceled) {
+ lock.wait();
+ }
+ }
+ throw new CancellationException();
+ }
+
+ @Override
+ public Object get(long timeout, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException {
+ synchronized (lock) {
+ while (!canceled) {
+ unit.timedWait(lock, timeout);
+ }
+
+ if (canceled) {
+ throw new CancellationException();
+ } else {
+ throw new TimeoutException();
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 040ec66..ff14249 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -269,6 +269,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
isRunning = true;
run();
+ // make sure all timers finish and no new timers can come
+ timerService.quiesceAndAwaitPending();
+
LOG.debug("Finished task {}", getName());
// make sure no further checkpoint and notification actions happen.
http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
index f4bead9..9eb6cd1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
@@ -39,6 +39,7 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
private volatile long currentTime = 0;
private volatile boolean isTerminated;
+ private volatile boolean isQuiesced;
// sorts the timers by timestamp so that they are processed in the correct order.
private final Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>();
@@ -47,25 +48,27 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
public void setCurrentTime(long timestamp) throws Exception {
this.currentTime = timestamp;
- // decide which timers to fire and put them in a list
- // we do not fire them here to be able to accommodate timers
- // that register other timers.
-
- Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator();
- List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>();
- while (it.hasNext()) {
- Map.Entry<Long, List<Triggerable>> t = it.next();
- if (t.getKey() <= this.currentTime) {
- toRun.add(t);
- it.remove();
+ if (!isQuiesced) {
+ // decide which timers to fire and put them in a list
+ // we do not fire them here to be able to accommodate timers
+ // that register other timers.
+
+ Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator();
+ List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>();
+ while (it.hasNext()) {
+ Map.Entry<Long, List<Triggerable>> t = it.next();
+ if (t.getKey() <= this.currentTime) {
+ toRun.add(t);
+ it.remove();
+ }
}
- }
-
- // now do the actual firing.
- for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
- long now = tasks.getKey();
- for (Triggerable task: tasks.getValue()) {
- task.trigger(now);
+
+ // now do the actual firing.
+ for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
+ long now = tasks.getKey();
+ for (Triggerable task: tasks.getValue()) {
+ task.trigger(now);
+ }
}
}
}
@@ -80,6 +83,9 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
if (isTerminated) {
throw new IllegalStateException("terminated");
}
+ if (isQuiesced) {
+ return new DummyFuture();
+ }
if (timestamp <= currentTime) {
try {
@@ -88,7 +94,6 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
throw new RuntimeException(e);
}
}
-
List<Triggerable> tasks = registeredTasks.get(timestamp);
if (tasks == null) {
tasks = new ArrayList<>();
@@ -105,8 +110,16 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
}
@Override
- public void shutdownService() throws Exception {
- isTerminated = true;
+ public void quiesceAndAwaitPending() {
+ if (!isTerminated) {
+ isQuiesced = true;
+ registeredTasks.clear();
+ }
+ }
+
+ @Override
+ public void shutdownService() {
+ this.isTerminated = true;
}
public int getNumRegisteredTimers() {
http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
index 42a4fa4..afa6f35 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
@@ -14,32 +14,70 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.streaming.runtime.operators.Triggerable;
+
import java.util.concurrent.ScheduledFuture;
/**
* Defines the current processing time and handles all related actions,
* such as register timers for tasks to be executed in the future.
+ *
+ * <p>The access to the time via {@link #getCurrentProcessingTime()} is always available, regardless of
+ * whether the timer service has been shut down.
+ *
+ * <p>The registration of timers follows a life cycle of three phases:
+ * <ol>
+ * <li>In the initial state, it accepts timer registrations and triggers when the time is reached.</li>
+ * <li>After calling {@link #quiesceAndAwaitPending()}, further calls to
+ * {@link #registerTimer(long, Triggerable)} will not register any further timers, and will
+ * return a "dummy" future as a result. This is used for clean shutdown, where currently firing
+ * timers are waited for and no future timers can be scheduled, without causing hard exceptions.</li>
+ * <li>After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long, Triggerable)}
+ * will result in a hard exception.</li>
+ * </ol>
*/
public abstract class TimeServiceProvider {
- /** Returns the current processing time. */
+ /**
+ * Returns the current processing time.
+ */
public abstract long getCurrentProcessingTime();
- /** Registers a task to be executed when (processing) time is {@code timestamp}.
- * @param timestamp
- * when the task is to be executed (in processing time)
- * @param target
- * the task to be executed
- * @return the result to be returned.
+ /**
+ * Registers a task to be executed when (processing) time is {@code timestamp}.
+ *
+ * @param timestamp Time when the task is to be executed (in processing time)
+ * @param target The task to be executed
+ *
+ * @return The future that represents the scheduled task. This always returns some future,
+ * even if the timer was shut down
*/
- public abstract ScheduledFuture<?> registerTimer(final long timestamp, final Triggerable target);
+ public abstract ScheduledFuture<?> registerTimer(long timestamp, Triggerable target);
- /** Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise. */
+ /**
+ * Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise.
+ */
public abstract boolean isTerminated();
- /** Shuts down and clean up the timer service provider. */
- public abstract void shutdownService() throws Exception;
+ /**
+ * This method puts the service into a state where it does not register new timers, but
+ * returns for each call to {@link #registerTimer(long, Triggerable)} only a "mock" future.
+ * Furthermore, the method clears all not yet started timers, and awaits the completion
+ * of currently executing timers.
+ *
+ * <p>This method can be used to cleanly shut down the timer service. The using components
+ * will not notice that the service is shut down (as for example via exceptions when registering
+ * a new timer), but the service will simply not fire any timer any more.
+ */
+ public abstract void quiesceAndAwaitPending() throws InterruptedException;
+
+ /**
+ * Shuts down and clean up the timer service provider hard and immediately. This does not wait
+ * for any timer to complete. Any further call to {@link #registerTimer(long, Triggerable)}
+ * will result in a hard exception.
+ */
+ public abstract void shutdownService();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
index 16e658e..d0c5050 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledFuture;
class NoOpTimerService extends TimeServiceProvider {
private volatile boolean terminated;
-
+
@Override
public long getCurrentProcessingTime() {
return System.currentTimeMillis();
@@ -43,7 +43,10 @@ class NoOpTimerService extends TimeServiceProvider {
}
@Override
- public void shutdownService() throws Exception {
+ public void quiesceAndAwaitPending() {}
+
+ @Override
+ public void shutdownService() {
terminated = true;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
new file mode 100644
index 0000000..ae895b6
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import org.junit.Test;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DefaultTimeServiceProviderTest {
+
+ @Test
+ public void testImmediateShutdown() throws Exception {
+
+ final Object lock = new Object();
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+ final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(errorRef), lock);
+
+ try {
+ assertFalse(timer.isTerminated());
+
+ final OneShotLatch latch = new OneShotLatch();
+
+ // the task should trigger immediately and should block until terminated with interruption
+ timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ latch.trigger();
+ Thread.sleep(100000000);
+ }
+ });
+
+ latch.await();
+ timer.shutdownService();
+
+ // can only enter this scope after the triggerable is interrupted
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (lock) {
+ assertTrue(timer.isTerminated());
+ }
+
+ try {
+ timer.registerTimer(System.currentTimeMillis() + 1000, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {}
+ });
+
+ fail("should result in an exception");
+ }
+ catch (IllegalStateException e) {
+ // expected
+ }
+
+ // obviously, we have an asynchronous interrupted exception
+ assertNotNull(errorRef.get());
+ assertTrue(errorRef.get().getCause() instanceof InterruptedException);
+
+ assertEquals(0, timer.getNumTasksScheduled());
+ }
+ finally {
+ timer.shutdownService();
+ }
+ }
+
+ @Test
+ public void testQuiescing() throws Exception {
+
+ final Object lock = new Object();
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+ final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(errorRef), lock);
+
+ try {
+ final OneShotLatch latch = new OneShotLatch();
+
+ final ReentrantLock scopeLock = new ReentrantLock();
+
+ timer.registerTimer(System.currentTimeMillis() + 20, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ scopeLock.lock();
+ try {
+ latch.trigger();
+ // delay a bit before leaving the method
+ Thread.sleep(5);
+ } finally {
+ scopeLock.unlock();
+ }
+ }
+ });
+
+ // after the task triggered, shut the timer down cleanly, waiting for the task to finish
+ latch.await();
+ timer.quiesceAndAwaitPending();
+
+ // should be able to immediately acquire the lock, since the task must have exited by now
+ assertTrue(scopeLock.tryLock());
+
+ // should be able to schedule more tasks (that never get executed)
+ ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ throw new Exception("test");
+ }
+ });
+ assertNotNull(future);
+
+ // nothing should be scheduled right now
+ assertEquals(0, timer.getNumTasksScheduled());
+
+ // check that no asynchronous error was reported - that ensures that the newly scheduled
+ // triggerable did, in fact, not trigger
+ if (errorRef.get() != null) {
+ throw new Exception(errorRef.get());
+ }
+ }
+ finally {
+ timer.shutdownService();
+ }
+ }
+
+ @Test
+ public void testFutureCancellation() throws Exception {
+
+ final Object lock = new Object();
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+ final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(errorRef), lock);
+
+ try {
+ assertEquals(0, timer.getNumTasksScheduled());
+
+ // schedule something
+ ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {}
+ });
+ assertEquals(1, timer.getNumTasksScheduled());
+
+ future.cancel(false);
+
+ assertEquals(0, timer.getNumTasksScheduled());
+ }
+ finally {
+ timer.shutdownService();
+ }
+ }
+}