You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/15 12:20:29 UTC

[flink] 01/04: [FLINK-13205][runtime] Make stop-with-savepoint non-blocking on SourceStreamTask checkpoint injecting thread

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3aa68d52af710ad6c0574d5a90ee898f5e58efc9
Author: Aleksey Pak <al...@ververica.com>
AuthorDate: Mon Jul 8 17:11:46 2019 +0200

    [FLINK-13205][runtime] Make stop-with-savepoint non-blocking on SourceStreamTask checkpoint injecting thread
---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  29 ++---
 .../runtime/tasks/SynchronousSavepointLatch.java   |  55 ++++----
 .../runtime/tasks/SourceTaskTerminationTest.java   | 103 +++++----------
 .../runtime/tasks/SynchronousCheckpointITCase.java |  27 +---
 .../tasks/SynchronousSavepointSyncLatchTest.java   | 144 +++++++++++----------
 5 files changed, 148 insertions(+), 210 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 52e2011..7960a2f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -686,7 +686,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			CheckpointMetrics checkpointMetrics) throws Exception {
 
 		try {
-			performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, false);
+			if (performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, false)) {
+				if (syncSavepointLatch.isSet()) {
+					syncSavepointLatch.blockUntilCheckpointIsAcknowledged();
+				}
+			}
 		}
 		catch (CancelTaskException e) {
 			LOG.info("Operator {} was cancelled while performing checkpoint {}.",
@@ -723,7 +727,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		final long checkpointId = checkpointMetaData.getCheckpointId();
 
-		final boolean result;
 		synchronized (lock) {
 			if (isRunning) {
 
@@ -754,7 +757,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				//           impact progress of the streaming topology
 				checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
 
-				result = true;
+				return true;
 			}
 			else {
 				// we cannot perform our checkpoint - let the downstream operators know that they
@@ -779,21 +782,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 					throw exception;
 				}
 
-				result = false;
-			}
-		}
-
-		if (isRunning && syncSavepointLatch.isSet()) {
-
-			final boolean checkpointWasAcked =
-					syncSavepointLatch.blockUntilCheckpointIsAcknowledged();
-
-			if (checkpointWasAcked) {
-				finishTask();
+				return false;
 			}
 		}
-
-		return result;
 	}
 
 	public ExecutorService getAsyncOperationsThreadPool() {
@@ -802,6 +793,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	@Override
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		boolean success = false;
 		synchronized (lock) {
 			if (isRunning) {
 				LOG.debug("Notification of complete checkpoint for task {}", getName());
@@ -812,12 +804,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 					}
 				}
 
-				syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId);
+				success = true;
 			}
 			else {
 				LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
 			}
 		}
+		if (success) {
+			syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId, this::finishTask);
+		}
 	}
 
 	private void tryShutdownTimerService() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
index 0a67dac..d10c19c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
@@ -20,6 +20,10 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.util.function.RunnableWithException;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 /**
  * A synchronization primitive used by the {@link StreamTask} to wait
@@ -29,12 +33,17 @@ class SynchronousSavepointLatch {
 
 	private static final long NOT_SET_CHECKPOINT_ID = -1L;
 
-	// these are mutually exclusive
+	enum CompletionResult {
+		COMPLETED,
+		CANCELED,
+	}
+
+	@GuardedBy("synchronizationPoint")
 	private volatile boolean waiting;
-	private volatile boolean completed;
-	private volatile boolean canceled;
 
-	private volatile boolean wasAlreadyCompleted;
+	@GuardedBy("synchronizationPoint")
+	@Nullable
+	private volatile CompletionResult completionResult;
 
 	private final Object synchronizationPoint;
 
@@ -44,8 +53,6 @@ class SynchronousSavepointLatch {
 		this.synchronizationPoint = new Object();
 
 		this.waiting = false;
-		this.completed = false;
-		this.canceled = false;
 		this.checkpointId = NOT_SET_CHECKPOINT_ID;
 	}
 
@@ -59,45 +66,38 @@ class SynchronousSavepointLatch {
 		}
 	}
 
-	boolean blockUntilCheckpointIsAcknowledged() throws Exception {
+	void blockUntilCheckpointIsAcknowledged() throws Exception {
 		synchronized (synchronizationPoint) {
-			if (isSet() && !isDone()) {
+			if (completionResult == null && isSet()) {
 				waiting = true;
 				synchronizationPoint.wait();
 				waiting = false;
 			}
-
-			if (!isCanceled() && !wasAlreadyCompleted) {
-				wasAlreadyCompleted = true;
-				return true;
-			}
-
-			return false;
 		}
 	}
 
-	void acknowledgeCheckpointAndTrigger(final long checkpointId) {
+	void acknowledgeCheckpointAndTrigger(final long checkpointId, RunnableWithException runnable) throws Exception {
 		synchronized (synchronizationPoint) {
-			if (isSet() && !isDone() && this.checkpointId == checkpointId) {
-				completed = true;
-				synchronizationPoint.notifyAll();
+			if (completionResult == null && this.checkpointId == checkpointId) {
+				completionResult = CompletionResult.COMPLETED;
+				try {
+					runnable.run();
+				} finally {
+					synchronizationPoint.notifyAll();
+				}
 			}
 		}
 	}
 
 	void cancelCheckpointLatch() {
 		synchronized (synchronizationPoint) {
-			if (!isDone()) {
-				canceled = true;
+			if (completionResult == null) {
+				completionResult = CompletionResult.CANCELED;
 				synchronizationPoint.notifyAll();
 			}
 		}
 	}
 
-	private boolean isDone () {
-		return canceled || completed;
-	}
-
 	@VisibleForTesting
 	boolean isWaiting() {
 		return waiting;
@@ -105,15 +105,14 @@ class SynchronousSavepointLatch {
 
 	@VisibleForTesting
 	boolean isCompleted() {
-		return completed;
+		return completionResult == CompletionResult.COMPLETED;
 	}
 
 	@VisibleForTesting
 	boolean isCanceled() {
-		return canceled;
+		return completionResult == CompletionResult.CANCELED;
 	}
 
-	@VisibleForTesting
 	boolean isSet() {
 		return checkpointId != NOT_SET_CHECKPOINT_ID;
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
index 9295788..4a613d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
-import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -33,62 +32,56 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * A test verifying the termination process
  * (synchronous checkpoint and task termination) at the {@link SourceStreamTask}.
  */
-public class SourceTaskTerminationTest {
+public class SourceTaskTerminationTest extends TestLogger {
 
 	private static OneShotLatch ready;
 	private static MultiShotLatch runLoopStart;
 	private static MultiShotLatch runLoopEnd;
 
-	private static AtomicReference<Throwable> error;
+	@Rule
+	public final Timeout timeoutPerTest = Timeout.seconds(20);
 
 	@Before
 	public void initialize() {
 		ready = new OneShotLatch();
 		runLoopStart = new MultiShotLatch();
 		runLoopEnd = new MultiShotLatch();
-		error = new AtomicReference<>();
-
-		error.set(null);
-	}
-
-	@After
-	public void validate() {
-		validateNoExceptionsWereThrown();
 	}
 
 	@Test
-	public void terminateShouldBlockDuringCheckpointingAndEmitMaxWatermark() throws Exception {
+	public void testStopWithSavepointWithMaxWatermark() throws Exception {
 		stopWithSavepointStreamTaskTestHelper(true);
 	}
 
 	@Test
-	public void suspendShouldBlockDuringCheckpointingAndNotEmitMaxWatermark() throws Exception {
+	public void testStopWithSavepointWithoutMaxWatermark() throws Exception {
 		stopWithSavepointStreamTaskTestHelper(false);
 	}
 
-	private void stopWithSavepointStreamTaskTestHelper(final boolean expectMaxWatermark) throws Exception {
+	private void stopWithSavepointStreamTaskTestHelper(final boolean withMaxWatermark) throws Exception {
 		final long syncSavepointId = 34L;
 
 		final StreamTaskTestHarness<Long> srcTaskTestHarness = getSourceStreamTaskTestHarness();
 		final Thread executionThread = srcTaskTestHarness.invoke();
 		final StreamTask<Long, ?> srcTask = srcTaskTestHarness.getTask();
+		final SynchronousSavepointLatch syncSavepointLatch = srcTask.getSynchronousSavepointLatch();
 
 		ready.await();
 
@@ -96,15 +89,29 @@ public class SourceTaskTerminationTest {
 		emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 1L);
 		emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 2L);
 
-		emitAndVerifyCheckpoint(srcTaskTestHarness, srcTask, 31L);
+		srcTask.triggerCheckpoint(
+				new CheckpointMetaData(31L, 900),
+				CheckpointOptions.forCheckpointWithDefaultLocation(),
+				false);
+
+		assertFalse(syncSavepointLatch.isSet());
+		assertFalse(syncSavepointLatch.isCompleted());
+		assertFalse(syncSavepointLatch.isWaiting());
+
+		verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 31L);
 
 		emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 3L);
 
-		final Thread syncSavepointThread = triggerSynchronousSavepointFromDifferentThread(srcTask, expectMaxWatermark, syncSavepointId);
+		srcTask.triggerCheckpoint(
+				new CheckpointMetaData(syncSavepointId, 900),
+				new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()),
+				withMaxWatermark);
 
-		final SynchronousSavepointLatch syncSavepointFuture = waitForSyncSavepointFutureToBeSet(srcTask);
+		assertTrue(syncSavepointLatch.isSet());
+		assertFalse(syncSavepointLatch.isCompleted());
+		assertFalse(syncSavepointLatch.isWaiting());
 
-		if (expectMaxWatermark) {
+		if (withMaxWatermark) {
 			// if we are in TERMINATE mode, we expect the source task
 			// to emit MAX_WM before the SYNC_SAVEPOINT barrier.
 			verifyWatermark(srcTaskTestHarness.getOutput(), Watermark.MAX_WATERMARK);
@@ -112,54 +119,12 @@ public class SourceTaskTerminationTest {
 
 		verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), syncSavepointId);
 
-		assertFalse(syncSavepointFuture.isCompleted());
-		assertTrue(syncSavepointFuture.isWaiting());
-
 		srcTask.notifyCheckpointComplete(syncSavepointId);
-		assertTrue(syncSavepointFuture.isCompleted());
+		assertTrue(syncSavepointLatch.isCompleted());
 
-		syncSavepointThread.join();
 		executionThread.join();
 	}
 
-	private void validateNoExceptionsWereThrown() {
-		if (error.get() != null && !(error.get() instanceof CancelTaskException)) {
-			fail(error.get().getMessage());
-		}
-	}
-
-	private Thread triggerSynchronousSavepointFromDifferentThread(
-			final StreamTask<Long, ?> task,
-			final boolean advanceToEndOfEventTime,
-			final long syncSavepointId) {
-		final Thread checkpointingThread = new Thread(() -> {
-			try {
-				task.triggerCheckpoint(
-						new CheckpointMetaData(syncSavepointId, 900),
-						new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()),
-						advanceToEndOfEventTime);
-			} catch (Exception e) {
-				error.set(e);
-			}
-		});
-		checkpointingThread.start();
-
-		return checkpointingThread;
-
-	}
-
-	private void emitAndVerifyCheckpoint(
-			final StreamTaskTestHarness<Long> srcTaskTestHarness,
-			final StreamTask<Long, ?> srcTask,
-			final long checkpointId) throws Exception {
-
-		srcTask.triggerCheckpoint(
-				new CheckpointMetaData(checkpointId, 900),
-				CheckpointOptions.forCheckpointWithDefaultLocation(),
-				false);
-		verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), checkpointId);
-	}
-
 	private StreamTaskTestHarness<Long> getSourceStreamTaskTestHarness() {
 		final StreamTaskTestHarness<Long> testHarness = new StreamTaskTestHarness<>(
 				SourceStreamTask::new,
@@ -177,16 +142,6 @@ public class SourceTaskTerminationTest {
 		return testHarness;
 	}
 
-	private SynchronousSavepointLatch waitForSyncSavepointFutureToBeSet(final StreamTask streamTaskUnderTest) throws InterruptedException {
-		final SynchronousSavepointLatch syncSavepointFuture = streamTaskUnderTest.getSynchronousSavepointLatch();
-		while (!syncSavepointFuture.isWaiting()) {
-			Thread.sleep(10L);
-
-			validateNoExceptionsWereThrown();
-		}
-		return syncSavepointFuture;
-	}
-
 	private void emitAndVerifyWatermarkAndElement(
 			final StreamTaskTestHarness<Long> srcTaskTestHarness,
 			final long expectedElement) throws InterruptedException {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index af09716..eee3a62 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
@@ -69,12 +68,10 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -89,8 +86,6 @@ import static org.mockito.Mockito.when;
  */
 public class SynchronousCheckpointITCase {
 
-	private static OneShotLatch checkpointTriggered = new OneShotLatch();
-
 	// A thread-safe queue to "log" and monitor events happening in the task's methods. Also, used by the test thread
 	// to synchronize actions with the task's threads.
 	private static LinkedBlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>();
@@ -99,7 +94,7 @@ public class SynchronousCheckpointITCase {
 	public final Timeout timeoutPerTest = Timeout.seconds(10);
 
 	@Test
-	public void taskCachedThreadPoolAllowsForSynchronousCheckpoints() throws Exception {
+	public void taskDispatcherThreadPoolAllowsForSynchronousCheckpoints() throws Exception {
 		final Task task = createTask(SynchronousCheckpointTestingTask.class);
 
 		try (TaskCleaner ignored = new TaskCleaner(task)) {
@@ -110,12 +105,6 @@ public class SynchronousCheckpointITCase {
 
 			assertEquals(ExecutionState.RUNNING, task.getExecutionState());
 
-			// Hack: we are triggering a checkpoint with advanceToEndOfEventTime = true, to be sure that
-			// triggerCheckpointBarrier has reached the sync checkpoint latch (by verifying in
-			// SynchronousCheckpointTestingTask.advanceToEndOfEventTime) and only then proceeding to
-			// notifyCheckpointComplete.
-			// Without such synchronization, the notifyCheckpointComplete execution may be executed first and leave this
-			// test in a deadlock.
 			task.triggerCheckpointBarrier(
 					42,
 					156865867234L,
@@ -123,16 +112,13 @@ public class SynchronousCheckpointITCase {
 					true);
 
 			assertThat(eventQueue.take(), is(Event.PRE_TRIGGER_CHECKPOINT));
+			assertThat(eventQueue.take(), is(Event.POST_TRIGGER_CHECKPOINT));
 			assertTrue(eventQueue.isEmpty());
 
-			checkpointTriggered.await();
-
 			task.notifyCheckpointComplete(42);
 
 			assertThat(eventQueue.take(), is(Event.PRE_NOTIFY_CHECKPOINT_COMPLETE));
-			assertThat(
-				Arrays.asList(eventQueue.take(), eventQueue.take()),
-				containsInAnyOrder(Event.POST_NOTIFY_CHECKPOINT_COMPLETE, Event.POST_TRIGGER_CHECKPOINT));
+			assertThat(eventQueue.take(), is(Event.POST_NOTIFY_CHECKPOINT_COMPLETE));
 			assertTrue(eventQueue.isEmpty());
 
 			assertEquals(ExecutionState.RUNNING, task.getExecutionState());
@@ -197,13 +183,6 @@ public class SynchronousCheckpointITCase {
 		protected void cleanup() {
 
 		}
-
-		@Override
-		protected void advanceToEndOfEventTime() throws Exception {
-			// Wake up the test thread that we have actually entered the checkpoint invocation and the sync checkpoint
-			// latch is set.
-			checkpointTriggered.trigger();
-		}
 	}
 
 	/**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java
index fa2f9b7..59f5a81 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java
@@ -18,17 +18,20 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.util.function.RunnableWithException;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -53,120 +56,127 @@ public class SynchronousSavepointSyncLatchTest {
 	}
 
 	@Test
-	public void waitAndThenTriggerWorks() throws Exception {
+	public void triggerUnblocksWait() throws Exception {
 		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
-		final WaitingOnLatchCallable callable = new WaitingOnLatchCallable(latchUnderTest, 1L);
 
-		executors.submit(callable);
+		latchUnderTest.setCheckpointId(1L);
+		assertFalse(latchUnderTest.isWaiting());
 
-		while (!latchUnderTest.isSet()) {
+		Future<Void> future = runThreadWaitingForCheckpointAck(latchUnderTest);
+		while (!latchUnderTest.isWaiting()) {
 			Thread.sleep(5L);
 		}
 
+		final AtomicBoolean triggered = new AtomicBoolean();
+
 		// wrong checkpoint id.
-		latchUnderTest.acknowledgeCheckpointAndTrigger(2L);
+		latchUnderTest.acknowledgeCheckpointAndTrigger(2L, () -> triggered.set(true));
+		assertFalse(triggered.get());
+		assertFalse(latchUnderTest.isCompleted());
 		assertTrue(latchUnderTest.isWaiting());
 
-		latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
+		latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> triggered.set(true));
+		assertTrue(triggered.get());
 		assertTrue(latchUnderTest.isCompleted());
+
+		future.get();
+		assertFalse(latchUnderTest.isWaiting());
 	}
 
 	@Test
-	public void waitAndThenCancelWorks() throws Exception {
+	public void cancelUnblocksWait() throws Exception {
 		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
-		final WaitingOnLatchCallable callable = new WaitingOnLatchCallable(latchUnderTest, 1L);
 
-		final Future<Boolean> resultFuture = executors.submit(callable);
+		latchUnderTest.setCheckpointId(1L);
+		assertFalse(latchUnderTest.isWaiting());
 
-		while (!latchUnderTest.isSet()) {
+		Future<Void> future = runThreadWaitingForCheckpointAck(latchUnderTest);
+		while (!latchUnderTest.isWaiting()) {
 			Thread.sleep(5L);
 		}
 
 		latchUnderTest.cancelCheckpointLatch();
-
-		boolean result = resultFuture.get();
-
-		assertFalse(result);
 		assertTrue(latchUnderTest.isCanceled());
+
+		future.get();
+		assertFalse(latchUnderTest.isWaiting());
 	}
 
 	@Test
-	public void triggeringReturnsTrueAtMostOnce() throws Exception {
+	public void waitAfterTriggerIsNotBlocking() throws Exception {
 		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
 
-		final WaitingOnLatchCallable firstCallable = new WaitingOnLatchCallable(latchUnderTest, 1L);
-		final WaitingOnLatchCallable secondCallable = new WaitingOnLatchCallable(latchUnderTest, 1L);
-
-		final Future<Boolean> firstFuture = executors.submit(firstCallable);
-		final Future<Boolean> secondFuture = executors.submit(secondCallable);
+		latchUnderTest.setCheckpointId(1L);
+		latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> {});
 
-		while (!latchUnderTest.isSet()) {
-			Thread.sleep(5L);
-		}
+		latchUnderTest.blockUntilCheckpointIsAcknowledged();
+	}
 
-		latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
+	@Test
+	public void waitAfterCancelIsNotBlocking() throws Exception {
+		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
 
-		final boolean firstResult = firstFuture.get();
-		final boolean secondResult = secondFuture.get();
+		latchUnderTest.setCheckpointId(1L);
+		latchUnderTest.cancelCheckpointLatch();
+		assertTrue(latchUnderTest.isCanceled());
 
-		// only one of the two can be true (it is a race so we do not know which one)
-		assertTrue(firstResult ^ secondResult);
+		latchUnderTest.blockUntilCheckpointIsAcknowledged();
 	}
 
 	@Test
-	public void waitAfterTriggerReturnsTrueImmediately() throws Exception {
+	public void triggeringInvokesCallbackAtMostOnce() throws Exception {
 		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
+
 		latchUnderTest.setCheckpointId(1L);
-		latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
-		final boolean triggerred = latchUnderTest.blockUntilCheckpointIsAcknowledged();
-		assertTrue(triggerred);
+
+		AtomicInteger counter = new AtomicInteger();
+		Future<Void> future1 = runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
+		Future<Void> future2 = runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
+		Future<Void> future3 = runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
+		future1.get();
+		future2.get();
+		future3.get();
+
+		assertEquals(1, counter.get());
 	}
 
 	@Test
-	public void waitAfterCancelDoesNothing() throws Exception {
+	public void triggeringAfterCancelDoesNotInvokeCallback() throws Exception {
 		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
+
 		latchUnderTest.setCheckpointId(1L);
 		latchUnderTest.cancelCheckpointLatch();
-		latchUnderTest.blockUntilCheckpointIsAcknowledged();
+		assertTrue(latchUnderTest.isCanceled());
+
+		final AtomicBoolean triggered = new AtomicBoolean();
+		latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> triggered.set(true));
+		assertFalse(triggered.get());
 	}
 
 	@Test
-	public void checkpointIdIsSetOnlyOnce() throws InterruptedException {
+	public void checkpointIdIsSetOnlyOnce() {
 		final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
 
-		final WaitingOnLatchCallable firstCallable = new WaitingOnLatchCallable(latchUnderTest, 1L);
-		executors.submit(firstCallable);
-
-		while (!latchUnderTest.isSet()) {
-			Thread.sleep(5L);
-		}
-
-		final WaitingOnLatchCallable secondCallable = new WaitingOnLatchCallable(latchUnderTest, 2L);
-		executors.submit(secondCallable);
-
-		latchUnderTest.acknowledgeCheckpointAndTrigger(2L);
-		assertTrue(latchUnderTest.isWaiting());
+		latchUnderTest.setCheckpointId(1L);
+		assertTrue(latchUnderTest.isSet());
+		assertEquals(1L, latchUnderTest.getCheckpointId());
 
-		latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
-		assertTrue(latchUnderTest.isCompleted());
+		latchUnderTest.setCheckpointId(2L);
+		assertTrue(latchUnderTest.isSet());
+		assertEquals(1L, latchUnderTest.getCheckpointId());
 	}
 
-	private static final class WaitingOnLatchCallable implements Callable<Boolean> {
-
-		private final SynchronousSavepointLatch latch;
-		private final long checkpointId;
-
-		WaitingOnLatchCallable(
-				final SynchronousSavepointLatch latch,
-				final long checkpointId) {
-			this.latch = checkNotNull(latch);
-			this.checkpointId = checkpointId;
-		}
+	private Future<Void> runThreadWaitingForCheckpointAck(SynchronousSavepointLatch latch) {
+		return executors.submit(() -> {
+			latch.blockUntilCheckpointIsAcknowledged();
+			return null;
+		});
+	}
 
-		@Override
-		public Boolean call() throws Exception {
-			latch.setCheckpointId(checkpointId);
-			return latch.blockUntilCheckpointIsAcknowledged();
-		}
+	private Future<Void> runThreadTriggeringCheckpoint(SynchronousSavepointLatch latch, long checkpointId, RunnableWithException runnable) {
+		return executors.submit(() -> {
+			latch.acknowledgeCheckpointAndTrigger(checkpointId, runnable);
+			return null;
+		});
 	}
 }