You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/05 22:17:03 UTC

[13/17] flink git commit: [FLINK-4750] [runtime] Cleanly await end of all currently executing processing time timers when finite streams finish.

[FLINK-4750] [runtime] Cleanly await end of all currently executing processing time timers when finite streams finish.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8aea8c8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8aea8c8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8aea8c8f

Branch: refs/heads/master
Commit: 8aea8c8f427f5511c6064abbc4b85a3ef106743a
Parents: 1cd8d4f
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Oct 5 14:33:01 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Oct 5 20:04:34 2016 +0200

----------------------------------------------------------------------
 .../tasks/DefaultTimeServiceProvider.java       | 151 +++++++++++++++-
 .../streaming/runtime/tasks/StreamTask.java     |   3 +
 .../runtime/tasks/TestTimeServiceProvider.java  |  55 +++---
 .../runtime/tasks/TimeServiceProvider.java      |  60 +++++--
 .../operators/windowing/NoOpTimerService.java   |   7 +-
 .../tasks/DefaultTimeServiceProviderTest.java   | 179 +++++++++++++++++++
 6 files changed, 414 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index 5664eac..d2c743f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -17,12 +17,20 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 
+import javax.annotation.Nonnull;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -32,6 +40,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class DefaultTimeServiceProvider extends TimeServiceProvider {
 
+	private static final int STATUS_ALIVE = 0;
+	private static final int STATUS_QUIESCED = 1;
+	private static final int STATUS_SHUTDOWN = 2;
+
+	// ------------------------------------------------------------------------
+
 	/** The containing task that owns this time service provider. */
 	private final AsyncExceptionHandler task;
 
@@ -41,6 +55,8 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 	/** The executor service that schedules and calls the triggers of this task*/
 	private final ScheduledThreadPoolExecutor timerService;
 
+	private final AtomicInteger status;
+
 
 	public DefaultTimeServiceProvider(AsyncExceptionHandler failureHandler, Object checkpointLock) {
 		this(failureHandler, checkpointLock, null);
@@ -50,19 +66,24 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 			AsyncExceptionHandler task,
 			Object checkpointLock,
 			ThreadFactory threadFactory) {
-		
+
 		this.task = checkNotNull(task);
 		this.checkpointLock = checkNotNull(checkpointLock);
 
+		this.status = new AtomicInteger(STATUS_ALIVE);
+
 		if (threadFactory == null) {
 			this.timerService = new ScheduledThreadPoolExecutor(1);
 		} else {
 			this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
 		}
 
-		// allow trigger tasks to be removed if all timers for
-		// that timestamp are removed by user
+		// tasks should be removed if the future is canceled
 		this.timerService.setRemoveOnCancelPolicy(true);
+
+		// make sure shutdown removes all pending tasks
+		this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+		this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
 	}
 
 	@Override
@@ -73,17 +94,50 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 	@Override
 	public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
 		long delay = Math.max(timestamp - getCurrentProcessingTime(), 0);
-		return timerService.schedule(new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
+
+		// we directly try to register the timer and only react to the status on exception
+		// that way we save unnecessary volatile accesses for each timer
+		try {
+			return timerService.schedule(
+					new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
+		}
+		catch (RejectedExecutionException e) {
+			final int status = this.status.get();
+			if (status == STATUS_QUIESCED) {
+				return new NeverCompleteFuture(delay);
+			}
+			else if (status == STATUS_SHUTDOWN) {
+				throw new IllegalStateException("Timer service is shut down");
+			}
+			else {
+				// something else happened, so propagate the exception
+				throw e;
+			}
+		}
 	}
 
 	@Override
 	public boolean isTerminated() {
-		return timerService.isTerminated();
+		return status.get() == STATUS_SHUTDOWN;
 	}
 
 	@Override
-	public void shutdownService() throws Exception {
-		timerService.shutdownNow();
+	public void quiesceAndAwaitPending() throws InterruptedException {
+		if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) {
+			timerService.shutdown();
+
+			// await forever (almost)
+			timerService.awaitTermination(365, TimeUnit.DAYS);
+		}
+	}
+
+	@Override
+	public void shutdownService() {
+		if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) || 
+				status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN))
+		{
+			timerService.shutdownNow();
+		}
 	}
 
 	// safety net to destroy the thread pool
@@ -93,6 +147,18 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 		timerService.shutdownNow();
 	}
 
+	@VisibleForTesting
+	int getNumTasksScheduled() {
+		BlockingQueue<?> queue = timerService.getQueue();
+		if (queue == null) {
+			return 0;
+		} else {
+			return queue.size();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Internal task that is invoked by the timer service and triggers the target.
 	 */
@@ -122,4 +188,75 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 			}
 		}
 	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class NeverCompleteFuture implements ScheduledFuture<Object> {
+
+		private final Object lock = new Object();
+
+		private final long delayMillis;
+
+		private volatile boolean canceled;
+
+
+		private NeverCompleteFuture(long delayMillis) {
+			this.delayMillis = delayMillis;
+		}
+
+		@Override
+		public long getDelay(@Nonnull TimeUnit unit) {
+			return unit.convert(delayMillis, TimeUnit.MILLISECONDS);
+		}
+
+		@Override
+		public int compareTo(@Nonnull Delayed o) {
+			long otherMillis = o.getDelay(TimeUnit.MILLISECONDS);
+			return Long.compare(this.delayMillis, otherMillis);
+		}
+
+		@Override
+		public boolean cancel(boolean mayInterruptIfRunning) {
+			synchronized (lock) {
+				canceled = true;
+				lock.notifyAll();
+			}
+			return true;
+		}
+
+		@Override
+		public boolean isCancelled() {
+			return canceled;
+		}
+
+		@Override
+		public boolean isDone() {
+			return false;
+		}
+
+		@Override
+		public Object get() throws InterruptedException {
+			synchronized (lock) {
+				while (!canceled) {
+					lock.wait();
+				}
+			}
+			throw new CancellationException();
+		}
+
+		@Override
+		public Object get(long timeout, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException {
+			synchronized (lock) {
+				while (!canceled) {
+					unit.timedWait(lock, timeout);
+				}
+
+				if (canceled) {
+					throw new CancellationException();
+				} else {
+					throw new TimeoutException();
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 040ec66..ff14249 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -269,6 +269,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			isRunning = true;
 			run();
 
+			// make sure all timers finish and no new timers can come
+			timerService.quiesceAndAwaitPending();
+
 			LOG.debug("Finished task {}", getName());
 
 			// make sure no further checkpoint and notification actions happen.

http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
index f4bead9..9eb6cd1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
@@ -39,6 +39,7 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
 	private volatile long currentTime = 0;
 
 	private volatile boolean isTerminated;
+	private volatile boolean isQuiesced;
 
 	// sorts the timers by timestamp so that they are processed in the correct order.
 	private final Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>();
@@ -47,25 +48,27 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
 	public void setCurrentTime(long timestamp) throws Exception {
 		this.currentTime = timestamp;
 
-		// decide which timers to fire and put them in a list
-		// we do not fire them here to be able to accommodate timers
-		// that register other timers.
-
-		Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator();
-		List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>();
-		while (it.hasNext()) {
-			Map.Entry<Long, List<Triggerable>> t = it.next();
-			if (t.getKey() <= this.currentTime) {
-				toRun.add(t);
-				it.remove();
+		if (!isQuiesced) {
+			// decide which timers to fire and put them in a list
+			// we do not fire them here to be able to accommodate timers
+			// that register other timers.
+	
+			Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator();
+			List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>();
+			while (it.hasNext()) {
+				Map.Entry<Long, List<Triggerable>> t = it.next();
+				if (t.getKey() <= this.currentTime) {
+					toRun.add(t);
+					it.remove();
+				}
 			}
-		}
-
-		// now do the actual firing.
-		for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
-			long now = tasks.getKey();
-			for (Triggerable task: tasks.getValue()) {
-				task.trigger(now);
+	
+			// now do the actual firing.
+			for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
+				long now = tasks.getKey();
+				for (Triggerable task: tasks.getValue()) {
+					task.trigger(now);
+				}
 			}
 		}
 	}
@@ -80,6 +83,9 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
 		if (isTerminated) {
 			throw new IllegalStateException("terminated");
 		}
+		if (isQuiesced) {
+			return new DummyFuture();
+		}
 
 		if (timestamp <= currentTime) {
 			try {
@@ -88,7 +94,6 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
 				throw new RuntimeException(e);
 			}
 		}
-
 		List<Triggerable> tasks = registeredTasks.get(timestamp);
 		if (tasks == null) {
 			tasks = new ArrayList<>();
@@ -105,8 +110,16 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
 	}
 
 	@Override
-	public void shutdownService() throws Exception {
-		isTerminated = true;
+	public void quiesceAndAwaitPending() {
+		if (!isTerminated) {
+			isQuiesced = true;
+			registeredTasks.clear();
+		}
+	}
+
+	@Override
+	public void shutdownService() {
+		this.isTerminated = true;
 	}
 
 	public int getNumRegisteredTimers() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
index 42a4fa4..afa6f35 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
@@ -14,32 +14,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.streaming.runtime.operators.Triggerable;
+
 import java.util.concurrent.ScheduledFuture;
 
 /**
  * Defines the current processing time and handles all related actions,
  * such as register timers for tasks to be executed in the future.
+ * 
+ * <p>The access to the time via {@link #getCurrentProcessingTime()} is always available, regardless of
+ * whether the timer service has been shut down.
+ * 
+ * <p>The registration of timers follows a life cycle of three phases:
+ * <ol>
+ *     <li>In the initial state, it accepts timer registrations and triggers when the time is reached.</li>
+ *     <li>After calling {@link #quiesceAndAwaitPending()}, further calls to
+ *         {@link #registerTimer(long, Triggerable)} will not register any further timers, and will
+ *         return a "dummy" future as a result. This is used for clean shutdown, where currently firing
+ *         timers are waited for and no future timers can be scheduled, without causing hard exceptions.</li>
+ *     <li>After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long, Triggerable)}
+ *         will result in a hard exception.</li>
+ * </ol>
  */
 public abstract class TimeServiceProvider {
 
-	/** Returns the current processing time. */
+	/**
+	 * Returns the current processing time.
+	 */
 	public abstract long getCurrentProcessingTime();
 
-	/** Registers a task to be executed when (processing) time is {@code timestamp}.
-	 * @param timestamp
-	 * 						when the task is to be executed (in processing time)
-	 * @param target
-	 * 						the task to be executed
-	 * @return the result to be returned.
+	/**
+	 * Registers a task to be executed when (processing) time is {@code timestamp}.
+	 * 
+	 * @param timestamp   Time when the task is to be executed (in processing time)
+	 * @param target      The task to be executed
+	 * 
+	 * @return The future that represents the scheduled task. This always returns some future,
+	 *         even if the timer was shut down
 	 */
-	public abstract ScheduledFuture<?> registerTimer(final long timestamp, final Triggerable target);
+	public abstract ScheduledFuture<?> registerTimer(long timestamp, Triggerable target);
 
-	/** Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise. */
+	/**
+	 * Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise.
+	 */
 	public abstract boolean isTerminated();
 
-	/** Shuts down and clean up the timer service provider. */
-	public abstract void shutdownService() throws Exception;
+	/**
+	 * This method puts the service into a state where it does not register new timers, but
+	 * returns for each call to {@link #registerTimer(long, Triggerable)} only a "mock" future.
+	 * Furthermore, the method clears all not yet started timers, and awaits the completion
+	 * of currently executing timers.
+	 * 
+	 * <p>This method can be used to cleanly shut down the timer service. The using components
+	 * will not notice that the service is shut down (as for example via exceptions when registering
+	 * a new timer), but the service will simply not fire any timer any more.
+	 */
+	public abstract void quiesceAndAwaitPending() throws InterruptedException;
+
+	/**
+	 * Shuts down and clean up the timer service provider hard and immediately. This does not wait
+	 * for any timer to complete. Any further call to {@link #registerTimer(long, Triggerable)}
+	 * will result in a hard exception.
+	 */
+	public abstract void shutdownService();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
index 16e658e..d0c5050 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledFuture;
 class NoOpTimerService extends TimeServiceProvider {
 
 	private volatile boolean terminated;
-	
+
 	@Override
 	public long getCurrentProcessingTime() {
 		return System.currentTimeMillis();
@@ -43,7 +43,10 @@ class NoOpTimerService extends TimeServiceProvider {
 	}
 
 	@Override
-	public void shutdownService() throws Exception {
+	public void quiesceAndAwaitPending() {}
+
+	@Override
+	public void shutdownService() {
 		terminated = true;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
new file mode 100644
index 0000000..ae895b6
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import org.junit.Test;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DefaultTimeServiceProviderTest {
+
+	@Test
+	public void testImmediateShutdown() throws Exception {
+
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+		final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		try {
+			assertFalse(timer.isTerminated());
+
+			final OneShotLatch latch = new OneShotLatch();
+
+			// the task should trigger immediately and should block until terminated with interruption
+			timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+				@Override
+				public void trigger(long timestamp) throws Exception {
+					latch.trigger();
+					Thread.sleep(100000000);
+				}
+			});
+
+			latch.await();
+			timer.shutdownService();
+
+			// can only enter this scope after the triggerable is interrupted
+			//noinspection SynchronizationOnLocalVariableOrMethodParameter
+			synchronized (lock) {
+				assertTrue(timer.isTerminated());
+			}
+
+			try {
+				timer.registerTimer(System.currentTimeMillis() + 1000, new Triggerable() {
+					@Override
+					public void trigger(long timestamp) {}
+				});
+
+				fail("should result in an exception");
+			}
+			catch (IllegalStateException e) {
+				// expected
+			}
+
+			// obviously, we have an asynchronous interrupted exception
+			assertNotNull(errorRef.get());
+			assertTrue(errorRef.get().getCause() instanceof InterruptedException);
+
+			assertEquals(0, timer.getNumTasksScheduled());
+		}
+		finally {
+			timer.shutdownService();
+		}
+	}
+
+	@Test
+	public void testQuiescing() throws Exception {
+
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+		final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		try {
+			final OneShotLatch latch = new OneShotLatch();
+
+			final ReentrantLock scopeLock = new ReentrantLock();
+
+			timer.registerTimer(System.currentTimeMillis() + 20, new Triggerable() {
+				@Override
+				public void trigger(long timestamp) throws Exception {
+					scopeLock.lock();
+					try {
+						latch.trigger();
+						// delay a bit before leaving the method
+						Thread.sleep(5);
+					} finally {
+						scopeLock.unlock();
+					}
+				}
+			});
+
+			// after the task triggered, shut the timer down cleanly, waiting for the task to finish
+			latch.await();
+			timer.quiesceAndAwaitPending();
+
+			// should be able to immediately acquire the lock, since the task must have exited by now 
+			assertTrue(scopeLock.tryLock());
+
+			// should be able to schedule more tasks (that never get executed)
+			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() {
+				@Override
+				public void trigger(long timestamp) throws Exception {
+					throw new Exception("test");
+				}
+			});
+			assertNotNull(future);
+
+			// nothing should be scheduled right now
+			assertEquals(0, timer.getNumTasksScheduled());
+
+			// check that no asynchronous error was reported - that ensures that the newly scheduled 
+			// triggerable did, in fact, not trigger
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
+		}
+		finally {
+			timer.shutdownService();
+		}
+	}
+
+	@Test
+	public void testFutureCancellation() throws Exception {
+
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+		final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		try {
+			assertEquals(0, timer.getNumTasksScheduled());
+
+			// schedule something
+			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() {
+				@Override
+				public void trigger(long timestamp) {}
+			});
+			assertEquals(1, timer.getNumTasksScheduled());
+
+			future.cancel(false);
+
+			assertEquals(0, timer.getNumTasksScheduled());
+		}
+		finally {
+			timer.shutdownService();
+		}
+	}
+}