You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/30 21:15:11 UTC

[flink] 08/11: [FLINK-16986][coordination] (part 3) Change OperatorCoordinator interface to support better exactly-once semantics

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5926e07f01f6c91a8a0265e5f4b30086572d8125
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 29 18:23:45 2020 +0200

    [FLINK-16986][coordination] (part 3) Change OperatorCoordinator interface to support better exactly-once semantics
    
    The semantics are defined as follows:
      - The OperatorCoordinator implementation must have a way of strictly ordering the sending of events and
        the completion of the checkpoint future (for example the same thread does both actions, or the actions
        are guarded by a mutex).
      - Every event sent before the checkpoint future is completed should be before the checkpoint
      - Eveny event sent after the checkpoint future is completed should be after the checkpoint
    
    The previous interface did not allow us to observe this point accurately. The future was created inside the
    application-specific OperatorCoordinator code and returned from the methods. By the time that the scheduler/checkpointing
    code could observe the future (attach handlers to it), some (small amount of) time had inevitably passed in the meantime.
    Within that time, the future could already be complete and some events could have been sent, and in that case the
    scheduler/checkpointing code could not determin which events were before the completion of the future, and which
    events were after the completion of the future.
    
    The changed interface passes the future from the scheduler/checkpointing code into the coordinator. The future already
    has synchronous handlers attached to it which exactly mark the point when the future was completed, allowing the
    scheduler/checkpointing code to observe the correct order in which the Checkpoint Coordinator implementation
    performed its actions (event sending, future completion).
---
 .../OperatorCoordinatorCheckpointContext.java      |  2 +-
 .../checkpoint/OperatorCoordinatorCheckpoints.java |  4 +-
 .../coordination/OperatorCoordinator.java          |  2 +-
 .../coordination/OperatorCoordinatorHolder.java    | 28 ++++++--------
 .../source/coordinator/SourceCoordinator.java      | 13 ++++---
 .../CoordinatorEventsExactlyOnceITCase.java        |  6 +--
 .../coordination/MockOperatorCoordinator.java      |  2 +-
 .../OperatorCoordinatorHolderTest.java             | 44 ++++++++++++----------
 .../OperatorCoordinatorSchedulerTest.java          |  2 +-
 .../coordination/TestingOperatorCoordinator.java   |  8 +---
 .../source/coordinator/SourceCoordinatorTest.java  | 24 +++++++++---
 .../collect/CollectSinkOperatorCoordinator.java    |  5 ++-
 12 files changed, 75 insertions(+), 65 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
index abc15b8..94d4e07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -29,7 +29,7 @@ import java.util.concurrent.CompletableFuture;
  */
 public interface OperatorCoordinatorCheckpointContext extends OperatorInfo {
 
-	CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
+	void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception;
 
 	void afterSourceBarrierInjection(long checkpointId);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
index 6215123..8de997a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -43,8 +43,8 @@ final class OperatorCoordinatorCheckpoints {
 			final OperatorCoordinatorCheckpointContext coordinatorContext,
 			final long checkpointId) throws Exception {
 
-		final CompletableFuture<byte[]> checkpointFuture =
-			coordinatorContext.checkpointCoordinator(checkpointId);
+		final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+		coordinatorContext.checkpointCoordinator(checkpointId, checkpointFuture);
 
 		return checkpointFuture.thenApply(
 				(state) -> new CoordinatorSnapshot(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index cb388b2..dc06ae0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -77,7 +77,7 @@ public interface OperatorCoordinator extends AutoCloseable {
 
 	// ------------------------------------------------------------------------
 
-	CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
+	void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception;
 
 	/**
 	 * Notifies the coordinator that the checkpoint with the given checkpointId completes and
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 9e68c7b..321b401 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -151,14 +151,12 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 	}
 
 	@Override
-	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+	public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
 		// unfortunately, this method does not run in the scheduler executor, but in the
 		// checkpoint coordinator time thread.
 		// we can remove the delegation once the checkpoint coordinator runs fully in the scheduler's
 		// main thread executor
-		final CompletableFuture<byte[]> future = new CompletableFuture<>();
-		mainThreadExecutor.execute(() -> checkpointCoordinatorInternal(checkpointId, future));
-		return future;
+		mainThreadExecutor.execute(() -> checkpointCoordinatorInternal(checkpointId, result));
 	}
 
 	@Override
@@ -187,19 +185,8 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 	private void checkpointCoordinatorInternal(final long checkpointId, final CompletableFuture<byte[]> result) {
 		mainThreadExecutor.assertRunningInMainThread();
 
-		final CompletableFuture<byte[]> checkpointFuture;
-		try {
-			eventValve.markForCheckpoint(checkpointId);
-			checkpointFuture = coordinator.checkpointCoordinator(checkpointId);
-		} catch (Throwable t) {
-			ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-			result.completeExceptionally(t);
-			globalFailureHandler.accept(t);
-			return;
-		}
-
 		// synchronously!!!, with the completion, we need to shut the event valve
-		checkpointFuture.whenComplete((success, failure) -> {
+		result.whenComplete((success, failure) -> {
 			if (failure != null) {
 				result.completeExceptionally(failure);
 			} else {
@@ -211,6 +198,15 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 				}
 			}
 		});
+
+		try {
+			eventValve.markForCheckpoint(checkpointId);
+			coordinator.checkpointCoordinator(checkpointId, result);
+		} catch (Throwable t) {
+			ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+			result.completeExceptionally(t);
+			globalFailureHandler.accept(t);
+		}
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index e0edbad..dc5f1dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -165,17 +165,18 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements
 	}
 
 	@Override
-	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+	public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
 		ensureStarted();
-		return CompletableFuture.supplyAsync(() -> {
+
+		coordinatorExecutor.execute(() -> {
 			try {
 				LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", operatorName, checkpointId);
-				return toBytes(checkpointId);
+				result.complete(toBytes(checkpointId));
 			} catch (Exception e) {
-				throw new CompletionException(
-						String.format("Failed to checkpoint coordinator for source %s due to ", operatorName), e);
+				result.completeExceptionally(new CompletionException(
+						String.format("Failed to checkpoint coordinator for source %s due to ", operatorName), e));
 			}
-		}, coordinatorExecutor);
+		});
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index 919e6fe..dacf5fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -292,10 +292,8 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
 		}
 
 		@Override
-		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
-			final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
-			requestedCheckpoint = checkpointFuture;
-			return checkpointFuture;
+		public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
+			requestedCheckpoint = result;
 		}
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
index 5e47350..f4e8444 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
@@ -49,7 +49,7 @@ public final class MockOperatorCoordinator implements OperatorCoordinator {
 	}
 
 	@Override
-	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+	public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
 		throw new UnsupportedOperationException();
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index fa4e7cf..29ec382 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -76,7 +76,9 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
 
-		final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(1L);
+		final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+		holder.checkpointCoordinator(1L, checkpointFuture);
+
 		assertFalse(checkpointFuture.isDone());
 	}
 
@@ -87,7 +89,8 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 
 		final byte[] testData = new byte[] {11, 22, 33, 44};
 
-		final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(9L);
+		final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+		holder.checkpointCoordinator(9L, checkpointFuture);
 		getCoordinator(holder).getLastTriggeredCheckpoint().complete(testData);
 
 		assertTrue(checkpointFuture.isDone());
@@ -99,7 +102,7 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
 
-		holder.checkpointCoordinator(1L);
+		holder.checkpointCoordinator(1L, new CompletableFuture<>());
 		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1);
 
 		assertThat(sender.events, contains(
@@ -194,7 +197,9 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
 
-		final CompletableFuture<byte[]> holderFuture = holder.checkpointCoordinator(1000L);
+		final CompletableFuture<byte[]> holderFuture = new CompletableFuture<>();
+		holder.checkpointCoordinator(1000L, holderFuture);
+
 		final CompletableFuture<byte[]> future1 = getCoordinator(holder).getLastTriggeredCheckpoint();
 		holder.abortCurrentTriggering();
 
@@ -203,7 +208,6 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 
 		future1.complete(new byte[0]);
 
-		assertTrue(holderFuture.isCompletedExceptionally());
 		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(123), 0);
 
 		assertThat(sender.events, contains(
@@ -216,8 +220,10 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
 
-		holder.checkpointCoordinator(11L);
-		final CompletableFuture<?> future = holder.checkpointCoordinator(12L);
+		holder.checkpointCoordinator(11L, new CompletableFuture<>());
+
+		final CompletableFuture<byte[]> future = new CompletableFuture<>();
+		holder.checkpointCoordinator(12L, future);
 
 		assertTrue(future.isCompletedExceptionally());
 		assertNotNull(globalFailure);
@@ -314,7 +320,8 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		executor.triggerAll();
 
 		// trigger the checkpoint - this should also shut the valve as soon as the future is completed
-		final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(0L);
+		final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+		holder.checkpointCoordinator(0L, checkpointFuture);
 		executor.triggerAll();
 
 		// give the coordinator some time to emit some events
@@ -339,7 +346,8 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 			OperatorCoordinatorHolder holder,
 			long checkpointId) throws Exception {
 
-		final CompletableFuture<byte[]> future = holder.checkpointCoordinator(checkpointId);
+		final CompletableFuture<byte[]> future = new CompletableFuture<>();
+		holder.checkpointCoordinator(checkpointId, future);
 		getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]);
 		return future;
 	}
@@ -422,17 +430,15 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		}
 
 		@Override
-		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
-			// we create the checkpoint future, but before returning it, we wait on a
-			// condition. that way, we simulate a "context switch" just at the time when the
+		public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
+			// before returning from this methof, we wait on a condition.
+			// that way, we simulate a "context switch" just at the time when the
 			// future would be returned and make the other thread complete the future and send an
 			// event before this method returns
-			final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
 			lock.lock();
 			try {
-				checkpoint = checkpointFuture;
+				checkpoint = result;
 				condition.await();
-				return checkpointFuture;
 			} finally {
 				lock.unlock();
 			}
@@ -472,10 +478,8 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		}
 
 		@Override
-		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
-			final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
-			checkpoint = checkpointFuture;
-			return checkpointFuture;
+		public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
+			checkpoint = result;
 		}
 
 		@Override
@@ -525,7 +529,7 @@ public class OperatorCoordinatorHolderTest extends TestLogger {
 		public void subtaskFailed(int subtask, @Nullable Throwable reason) {}
 
 		@Override
-		public abstract CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
+		public abstract void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception;
 
 		@Override
 		public void checkpointComplete(long checkpointId) {}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 6ec4dd7..353b6b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -689,7 +689,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		}
 
 		@Override
-		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+		public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
 			throw new Error(new TestException());
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index e914afe..e91bce3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -75,13 +75,9 @@ class TestingOperatorCoordinator implements OperatorCoordinator {
 	}
 
 	@Override
-	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
-		final CompletableFuture<byte[]> coordinatorStateFuture = new CompletableFuture<>();
-
-		boolean added = triggeredCheckpoints.offer(coordinatorStateFuture);
+	public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
+		boolean added = triggeredCheckpoints.offer(result);
 		assert added; // guard the test assumptions
-
-		return coordinatorStateFuture;
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index d92b9ba..be244fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -32,6 +32,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
 import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException;
@@ -55,7 +56,7 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
 				failureMessage, "The coordinator has not started yet.");
 		verifyException(() -> sourceCoordinator.subtaskFailed(0, null),
 				failureMessage, "The coordinator has not started yet.");
-		verifyException(() -> sourceCoordinator.checkpointCoordinator(100L),
+		verifyException(() -> sourceCoordinator.checkpointCoordinator(100L, new CompletableFuture<>()),
 				failureMessage, "The coordinator has not started yet.");
 	}
 
@@ -114,7 +115,10 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
 		sourceCoordinator.start();
 		sourceCoordinator.handleEventFromOperator(
 				0, new ReaderRegistrationEvent(0, "location_0"));
-		byte[] bytes = sourceCoordinator.checkpointCoordinator(100L).get();
+
+		final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+		sourceCoordinator.checkpointCoordinator(100L, checkpointFuture);
+		final byte[] bytes = checkpointFuture.get();
 
 		// restore from the checkpoints.
 		SourceCoordinator<?, ?> restoredCoordinator = getNewSourceCoordinator();
@@ -136,11 +140,17 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
 		// Assign some splits to reader 0 then take snapshot 100.
 		sourceCoordinator.handleEventFromOperator(
 				0, new ReaderRegistrationEvent(0, "location_0"));
-		sourceCoordinator.checkpointCoordinator(100L).get();
+
+		final CompletableFuture<byte[]> checkpointFuture1 = new CompletableFuture<>();
+		sourceCoordinator.checkpointCoordinator(100L, checkpointFuture1);
+		checkpointFuture1.get();
 
 		// Add split 6, assign it to reader 0 and take another snapshot 101.
 		enumerator.addNewSplits(Collections.singletonList(new MockSourceSplit(6)));
-		sourceCoordinator.checkpointCoordinator(101L).get();
+
+		final CompletableFuture<byte[]> checkpointFuture2 = new CompletableFuture<>();
+		sourceCoordinator.checkpointCoordinator(101L, checkpointFuture2);
+		checkpointFuture2.get();
 
 		// check the state.
 		check(() -> {
@@ -185,7 +195,11 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
 		// Assign some splits to reader 0 then take snapshot 100.
 		sourceCoordinator.handleEventFromOperator(
 				0, new ReaderRegistrationEvent(0, "location_0"));
-		sourceCoordinator.checkpointCoordinator(100L).get();
+
+		final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+		sourceCoordinator.checkpointCoordinator(100L, checkpointFuture);
+		checkpointFuture.get();
+
 		// Complete checkpoint 100.
 		sourceCoordinator.checkpointComplete(100L);
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index 6c84266..695e4bd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -190,11 +190,12 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor
 	}
 
 	@Override
-	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+	public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		ObjectOutputStream oos = new ObjectOutputStream(baos);
 		oos.writeObject(address);
-		return CompletableFuture.completedFuture(baos.toByteArray());
+
+		result.complete(baos.toByteArray());
 	}
 
 	@Override