You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/30 21:15:10 UTC

[flink] 07/11: [FLINK-16986][coordination] (part 2) Make OperatorCoordinatorHolder driven by main thread executor

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 37f7db38290df065669178cc6407edd5055f1951
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 28 18:19:30 2020 +0200

    [FLINK-16986][coordination] (part 2) Make OperatorCoordinatorHolder driven by main thread executor
---
 .../coordination/OperatorCoordinatorHolder.java    | 132 +++--
 .../operators/coordination/OperatorEventValve.java |  62 ++-
 .../flink/runtime/scheduler/SchedulerBase.java     |   2 +-
 .../OperatorCoordinatorHolderTest.java             | 556 +++++++++++++++++++++
 .../coordination/OperatorEventValveTest.java       |  46 +-
 5 files changed, 725 insertions(+), 73 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 304eb20..9e68c7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -20,11 +20,13 @@ package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.SerializedValue;
@@ -57,19 +59,16 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorCoordinatorCheckpointContext {
 
-	private static final long NO_CHECKPOINT = Long.MIN_VALUE;
-
 	private final OperatorCoordinator coordinator;
 	private final OperatorID operatorId;
 	private final LazyInitializedCoordinatorContext context;
-
 	private final OperatorEventValve eventValve;
 
-	// these two fields are needed for the construction of OperatorStateHandles when taking checkpoints
 	private final int operatorParallelism;
 	private final int operatorMaxParallelism;
 
-	private volatile long currentlyTriggeredCheckpoint;
+	private Consumer<Throwable> globalFailureHandler;
+	private ComponentMainThreadExecutor mainThreadExecutor;
 
 	private OperatorCoordinatorHolder(
 			final OperatorID operatorId,
@@ -85,17 +84,17 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 		this.eventValve = checkNotNull(eventValve);
 		this.operatorParallelism = operatorParallelism;
 		this.operatorMaxParallelism = operatorMaxParallelism;
-
-		this.currentlyTriggeredCheckpoint = NO_CHECKPOINT;
 	}
 
-	public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
-		lazyInitialize(scheduler::handleGlobalFailure, schedulerExecutor);
+	public void lazyInitialize(SchedulerNG scheduler, ComponentMainThreadExecutor mainThreadExecutor) {
+		lazyInitialize(scheduler::handleGlobalFailure, mainThreadExecutor);
 	}
 
 	@VisibleForTesting
-	void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) {
-		context.lazyInitialize(globalFailureHandler, schedulerExecutor);
+	void lazyInitialize(Consumer<Throwable> globalFailureHandler, ComponentMainThreadExecutor mainThreadExecutor) {
+		this.globalFailureHandler = globalFailureHandler;
+		this.mainThreadExecutor = mainThreadExecutor;
+		context.lazyInitialize(globalFailureHandler, mainThreadExecutor);
 	}
 
 	// ------------------------------------------------------------------------
@@ -127,6 +126,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 
 	@Override
 	public void start() throws Exception {
+		mainThreadExecutor.assertRunningInMainThread();
 		checkState(context.isInitialized(), "Coordinator Context is not yet initialized");
 		coordinator.start();
 	}
@@ -139,78 +139,112 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 
 	@Override
 	public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+		mainThreadExecutor.assertRunningInMainThread();
 		coordinator.handleEventFromOperator(subtask, event);
 	}
 
 	@Override
 	public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+		mainThreadExecutor.assertRunningInMainThread();
 		coordinator.subtaskFailed(subtask, reason);
 		eventValve.resetForTask(subtask);
 	}
 
 	@Override
-	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
-		setCurrentlyTriggeredCheckpoint(checkpointId);
-
-		final CompletableFuture<byte[]> checkpointFuture = coordinator.checkpointCoordinator(checkpointId);
-
-		// synchronously!!!, with the completion, we need to shut the event valve
-		checkpointFuture.whenComplete((ignored, failure) -> {
-			if (failure != null) {
-				abortCurrentTriggering();
-			} else {
-				onCheckpointStateFutureComplete(checkpointId);
-			}
-		});
-
-		return checkpointFuture;
+	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+		// unfortunately, this method does not run in the scheduler executor, but in the
+		// checkpoint coordinator time thread.
+		// we can remove the delegation once the checkpoint coordinator runs fully in the scheduler's
+		// main thread executor
+		final CompletableFuture<byte[]> future = new CompletableFuture<>();
+		mainThreadExecutor.execute(() -> checkpointCoordinatorInternal(checkpointId, future));
+		return future;
 	}
 
 	@Override
 	public void checkpointComplete(long checkpointId) {
-		coordinator.checkpointComplete(checkpointId);
+		// unfortunately, this method does not run in the scheduler executor, but in the
+		// checkpoint coordinator time thread.
+		// we can remove the delegation once the checkpoint coordinator runs fully in the scheduler's
+		// main thread executor
+		mainThreadExecutor.execute(() -> checkpointCompleteInternal(checkpointId));
 	}
 
 	@Override
 	public void resetToCheckpoint(byte[] checkpointData) throws Exception {
-		resetCheckpointTriggeringCheck();
+		// ideally we would like to check this here, however this method is called early during
+		// execution graph construction, before the main thread executor is set
+
 		eventValve.reset();
 		coordinator.resetToCheckpoint(checkpointData);
 	}
 
+	private void checkpointCompleteInternal(long checkpointId) {
+		mainThreadExecutor.assertRunningInMainThread();
+		coordinator.checkpointComplete(checkpointId);
+	}
+
+	private void checkpointCoordinatorInternal(final long checkpointId, final CompletableFuture<byte[]> result) {
+		mainThreadExecutor.assertRunningInMainThread();
+
+		final CompletableFuture<byte[]> checkpointFuture;
+		try {
+			eventValve.markForCheckpoint(checkpointId);
+			checkpointFuture = coordinator.checkpointCoordinator(checkpointId);
+		} catch (Throwable t) {
+			ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+			result.completeExceptionally(t);
+			globalFailureHandler.accept(t);
+			return;
+		}
+
+		// synchronously!!!, with the completion, we need to shut the event valve
+		checkpointFuture.whenComplete((success, failure) -> {
+			if (failure != null) {
+				result.completeExceptionally(failure);
+			} else {
+				try {
+					eventValve.shutValve(checkpointId);
+					result.complete(success);
+				} catch (Exception e) {
+					result.completeExceptionally(e);
+				}
+			}
+		});
+	}
+
 	// ------------------------------------------------------------------------
 	//  Checkpointing Callbacks
 	// ------------------------------------------------------------------------
 
 	@Override
 	public void afterSourceBarrierInjection(long checkpointId) {
-		verifyNoOtherCheckpointBeingTriggered(checkpointId);
-		eventValve.openValve();
-		resetCheckpointTriggeringCheck();
-	}
+		// this method is commonly called by the CheckpointCoordinator's executor thread (timer thread).
 
-	@Override
-	public void abortCurrentTriggering() {
-		eventValve.openValve();
-		resetCheckpointTriggeringCheck();
-	}
+		// we ideally want the scheduler main-thread to be the one that sends the blocked events
+		// however, we need to react synchronously here, to maintain consistency and not allow
+		// another checkpoint injection in-between (unlikely, but possible).
+		// fortunately, the event-sending goes pretty much directly to the RPC gateways, which are
+		// thread safe.
 
-	void onCheckpointStateFutureComplete(long checkpointId) {
-		verifyNoOtherCheckpointBeingTriggered(checkpointId);
-		eventValve.shutValve();
+		// this will automatically be fixed once the checkpoint coordinator runs in the
+		// scheduler's main thread executor
+		eventValve.openValveAndUnmarkCheckpoint();
 	}
 
-	private void verifyNoOtherCheckpointBeingTriggered(long checkpointId) {
-		checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId);
-	}
+	@Override
+	public void abortCurrentTriggering() {
+		// this method is commonly called by the CheckpointCoordinator's executor thread (timer thread).
 
-	private void setCurrentlyTriggeredCheckpoint(long checkpointId) {
-		verifyNoOtherCheckpointBeingTriggered(checkpointId);
-		currentlyTriggeredCheckpoint = checkpointId;
-	}
+		// we ideally want the scheduler main-thread to be the one that sends the blocked events
+		// however, we need to react synchronously here, to maintain consistency and not allow
+		// another checkpoint injection in-between (unlikely, but possible).
+		// fortunately, the event-sending goes pretty much directly to the RPC gateways, which are
+		// thread safe.
 
-	private void resetCheckpointTriggeringCheck() {
-		currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+		// this will automatically be fixed once the checkpoint coordinator runs in the
+		// scheduler's main thread executor
+		eventValve.openValveAndUnmarkCheckpoint();
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
index bc1bfcf..f63df4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
@@ -41,7 +42,9 @@ import java.util.function.BiFunction;
  *
  * <p>This class is fully thread safe, under the assumption that the event sender is thread-safe.
  */
-public final class OperatorEventValve {
+final class OperatorEventValve {
+
+	private static final long NO_CHECKPOINT = Long.MIN_VALUE;
 
 	private final Object lock = new Object();
 
@@ -52,6 +55,12 @@ public final class OperatorEventValve {
 	private final Map<Integer, List<BlockedEvent>> blockedEvents = new LinkedHashMap<>();
 
 	@GuardedBy("lock")
+	private long currentCheckpointId;
+
+	@GuardedBy("lock")
+	private long lastCheckpointId;
+
+	@GuardedBy("lock")
 	private boolean shut;
 
 	/**
@@ -60,6 +69,8 @@ public final class OperatorEventValve {
 	 */
 	public OperatorEventValve(BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender) {
 		this.eventSender = eventSender;
+		this.currentCheckpointId = NO_CHECKPOINT;
+		this.lastCheckpointId = Long.MIN_VALUE;
 	}
 
 	// ------------------------------------------------------------------------
@@ -96,21 +107,52 @@ public final class OperatorEventValve {
 	 * Shuts the value. All events sent through this valve are blocked until the valve is re-opened.
 	 * If the valve is already shut, this does nothing.
 	 */
-	public void shutValve() {
-		// synchronized block for visibility
+	public void markForCheckpoint(long checkpointId) {
 		synchronized (lock) {
-			shut = true;
+			if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != checkpointId) {
+				throw new IllegalStateException(String.format(
+						"Cannot mark for checkpoint %d, already marked for checkpoint %d",
+						checkpointId, currentCheckpointId));
+			}
+			if (checkpointId > lastCheckpointId) {
+				currentCheckpointId = checkpointId;
+				lastCheckpointId = checkpointId;
+			} else {
+				throw new IllegalStateException(String.format(
+						"Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d",
+						lastCheckpointId, checkpointId));
+			}
+		}
+	}
+
+	/**
+	 * Shuts the value. All events sent through this valve are blocked until the valve is re-opened.
+	 * If the valve is already shut, this does nothing.
+	 */
+	public void shutValve(long checkpointId) {
+		synchronized (lock) {
+			if (checkpointId == currentCheckpointId) {
+				shut = true;
+			} else {
+				throw new IllegalStateException(String.format(
+					"Cannot shut valve for non-prepared checkpoint. " +
+					"Prepared checkpoint = %s, attempting-to-close checkpoint = %d",
+					(currentCheckpointId == NO_CHECKPOINT ? "(none)" : String.valueOf(currentCheckpointId)),
+					checkpointId));
+			}
 		}
 	}
 
 	/**
 	 * Opens the value, releasing all buffered events.
 	 */
-	public void openValve() {
+	public void openValveAndUnmarkCheckpoint() {
 		final ArrayList<FuturePair> futures;
 
 		// send all events under lock, so that no new event can sneak between
 		synchronized (lock) {
+			currentCheckpointId = NO_CHECKPOINT;
+
 			if (!shut) {
 				return;
 			}
@@ -129,14 +171,7 @@ public final class OperatorEventValve {
 
 		// apply the logic on the future outside the lock, to be safe
 		for (FuturePair pair : futures) {
-			final CompletableFuture<Acknowledge> originalFuture = pair.originalFuture;
-			pair.ackFuture.whenComplete((success, failure) -> {
-				if (failure != null) {
-					originalFuture.completeExceptionally(failure);
-				} else {
-					originalFuture.complete(success);
-				}
-			});
+			FutureUtils.forward(pair.ackFuture, pair.originalFuture);
 		}
 	}
 
@@ -165,6 +200,7 @@ public final class OperatorEventValve {
 			}
 			blockedEvents.clear();
 			shut = false;
+			currentCheckpointId = NO_CHECKPOINT;
 		}
 
 		failAllFutures(events);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index ce1d8a1..1a10d6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -970,7 +970,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 		}
 	}
 
-	private void initializeOperatorCoordinators(Executor mainThreadExecutor) {
+	private void initializeOperatorCoordinators(ComponentMainThreadExecutor mainThreadExecutor) {
 		for (OperatorCoordinatorHolder coordinatorHolder : getAllCoordinators()) {
 			coordinatorHolder.lazyInitialize(this, mainThreadExecutor);
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
new file mode 100644
index 0000000..fa4e7cf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.TestEventSender.EventWithSubtask;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * A test that ensures the before/after conditions around event sending and checkpoint are met.
+ * concurrency
+ */
+@SuppressWarnings("serial")
+public class OperatorCoordinatorHolderTest extends TestLogger {
+
+	private final Consumer<Throwable> globalFailureHandler = (t) -> globalFailure = t;
+	private Throwable globalFailure;
+
+	@After
+	public void checkNoGlobalFailure() throws Exception {
+		if (globalFailure != null) {
+			ExceptionUtils.rethrowException(globalFailure);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void checkpointFutureInitiallyNotDone() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(1L);
+		assertFalse(checkpointFuture.isDone());
+	}
+
+	@Test
+	public void completedCheckpointFuture() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		final byte[] testData = new byte[] {11, 22, 33, 44};
+
+		final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(9L);
+		getCoordinator(holder).getLastTriggeredCheckpoint().complete(testData);
+
+		assertTrue(checkpointFuture.isDone());
+		assertArrayEquals(testData, checkpointFuture.get());
+	}
+
+	@Test
+	public void eventsBeforeCheckpointFutureCompletionPassThrough() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		holder.checkpointCoordinator(1L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1);
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(1), 1)
+		));
+	}
+
+	@Test
+	public void eventsAreBlockedAfterCheckpointFutureCompletes() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		triggerAndCompleteCheckpoint(holder, 10L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1337), 0);
+
+		assertTrue(sender.events.isEmpty());
+	}
+
+	@Test
+	public void abortedCheckpointReleasesBlockedEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		triggerAndCompleteCheckpoint(holder, 123L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1337), 0);
+		holder.abortCurrentTriggering();
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(1337), 0)
+		));
+	}
+
+	@Test
+	public void sourceBarrierInjectionReleasesBlockedEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		triggerAndCompleteCheckpoint(holder, 1111L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1337), 0);
+		holder.afterSourceBarrierInjection(1111L);
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(1337), 0)
+		));
+	}
+
+	@Test
+	public void failedTasksDropsBlockedEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		triggerAndCompleteCheckpoint(holder, 1000L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1);
+		holder.subtaskFailed(1, null);
+		holder.abortCurrentTriggering();
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(0), 0)
+		));
+	}
+
+	@Test
+	public void restoreOpensValveEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		triggerAndCompleteCheckpoint(holder, 1000L);
+		holder.resetToCheckpoint(new byte[0]);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(999), 1);
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(999), 1)
+		));
+	}
+
+	@Test
+	public void restoreDropsBlockedEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		triggerAndCompleteCheckpoint(holder, 1000L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 1);
+		holder.resetToCheckpoint(new byte[0]);
+
+		assertTrue(sender.events.isEmpty());
+	}
+
+	@Test
+	public void lateCompleteCheckpointFutureDoesNotBlockEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		final CompletableFuture<byte[]> holderFuture = holder.checkpointCoordinator(1000L);
+		final CompletableFuture<byte[]> future1 = getCoordinator(holder).getLastTriggeredCheckpoint();
+		holder.abortCurrentTriggering();
+
+		triggerAndCompleteCheckpoint(holder, 1010L);
+		holder.afterSourceBarrierInjection(1010L);
+
+		future1.complete(new byte[0]);
+
+		assertTrue(holderFuture.isCompletedExceptionally());
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(123), 0);
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(123), 0)
+		));
+	}
+
+	@Test
+	public void triggeringFailsIfOtherTriggeringInProgress() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		holder.checkpointCoordinator(11L);
+		final CompletableFuture<?> future = holder.checkpointCoordinator(12L);
+
+		assertTrue(future.isCompletedExceptionally());
+		assertNotNull(globalFailure);
+		globalFailure = null;
+	}
+
+	@Test
+	public void takeCheckpointAfterSuccessfulCheckpoint() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0);
+
+		triggerAndCompleteCheckpoint(holder, 22L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 0);
+		holder.afterSourceBarrierInjection(22L);
+
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(2), 0);
+
+		triggerAndCompleteCheckpoint(holder, 23L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(3), 0);
+		holder.afterSourceBarrierInjection(23L);
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(0), 0),
+			new EventWithSubtask(new TestOperatorEvent(1), 0),
+			new EventWithSubtask(new TestOperatorEvent(2), 0),
+			new EventWithSubtask(new TestOperatorEvent(3), 0)
+		));
+	}
+
+	@Test
+	public void takeCheckpointAfterAbortedCheckpoint() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, TestingOperatorCoordinator::new);
+
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(0), 0);
+
+		triggerAndCompleteCheckpoint(holder, 22L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(1), 0);
+		holder.abortCurrentTriggering();
+
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(2), 0);
+
+		triggerAndCompleteCheckpoint(holder, 23L);
+		getCoordinator(holder).getContext().sendEvent(new TestOperatorEvent(3), 0);
+		holder.afterSourceBarrierInjection(23L);
+
+		assertThat(sender.events, contains(
+			new EventWithSubtask(new TestOperatorEvent(0), 0),
+			new EventWithSubtask(new TestOperatorEvent(1), 0),
+			new EventWithSubtask(new TestOperatorEvent(2), 0),
+			new EventWithSubtask(new TestOperatorEvent(3), 0)
+		));
+	}
+
+	/**
+	 * This test verifies that the order of Checkpoint Completion and Event Sending observed from the
+	 * outside matches that from within the OperatorCoordinator.
+	 *
+	 * <p>Extreme case 1: The coordinator immediately completes the checkpoint future and sends an
+	 * event directly after that.
+	 */
+	@Test
+	public void verifyCheckpointEventOrderWhenCheckpointFutureCompletedImmediately() throws Exception {
+		checkpointEventValueAtomicity(FutureCompletedInstantlyTestCoordinator::new);
+	}
+
+	/**
+	 * This test verifies that the order of Checkpoint Completion and Event Sending observed from the
+	 * outside matches that from within the OperatorCoordinator.
+	 *
+	 * <p>Extreme case 2: After the checkpoint triggering, the coordinator flushes a bunch of events
+	 * before completing the checkpoint future.
+	 */
+	@Test
+	public void verifyCheckpointEventOrderWhenCheckpointFutureCompletesLate() throws Exception {
+		checkpointEventValueAtomicity(FutureCompletedAfterSendingEventsCoordinator::new);
+	}
+
+	private void checkpointEventValueAtomicity(
+			final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) throws Exception {
+
+		final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
+		final ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(
+				(ScheduledExecutorService) executor, Thread.currentThread());
+
+		final TestEventSender sender = new TestEventSender();
+		final OperatorCoordinatorHolder holder = createCoordinatorHolder(
+				sender, coordinatorCtor, mainThreadExecutor);
+
+		// give the coordinator some time to emit some events
+		Thread.sleep(new Random().nextInt(10) + 20);
+		executor.triggerAll();
+
+		// trigger the checkpoint - this should also shut the valve as soon as the future is completed
+		final CompletableFuture<byte[]> checkpointFuture = holder.checkpointCoordinator(0L);
+		executor.triggerAll();
+
+		// give the coordinator some time to emit some events
+		Thread.sleep(new Random().nextInt(10) + 10);
+		holder.close();
+		executor.triggerAll();
+
+		assertTrue(checkpointFuture.isDone());
+		final int checkpointedNumber = bytesToInt(checkpointFuture.get());
+
+		assertEquals(checkpointedNumber, sender.events.size());
+		for (int i = 0; i < checkpointedNumber; i++) {
+			assertEquals(i, ((TestOperatorEvent) sender.events.get(i).event).getValue());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//   test actions
+	// ------------------------------------------------------------------------
+
+	private CompletableFuture<byte[]> triggerAndCompleteCheckpoint(
+			OperatorCoordinatorHolder holder,
+			long checkpointId) throws Exception {
+
+		final CompletableFuture<byte[]> future = holder.checkpointCoordinator(checkpointId);
+		getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]);
+		return future;
+	}
+
+	// ------------------------------------------------------------------------
+	//   miscellaneous helpers
+	// ------------------------------------------------------------------------
+
+	static byte[] intToBytes(int value) {
+		return ByteBuffer.allocate(4).putInt(value).array();
+	}
+
+	static int bytesToInt(byte[] bytes) {
+		return ByteBuffer.wrap(bytes).getInt();
+	}
+
+	private static TestingOperatorCoordinator getCoordinator(OperatorCoordinatorHolder holder) {
+		return (TestingOperatorCoordinator) holder.coordinator();
+	}
+
+	private OperatorCoordinatorHolder createCoordinatorHolder(
+			final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender,
+			final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) throws Exception {
+
+		return createCoordinatorHolder(
+				eventSender,
+				coordinatorCtor,
+				ComponentMainThreadExecutorServiceAdapter.forMainThread());
+	}
+
+	private OperatorCoordinatorHolder createCoordinatorHolder(
+			final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender,
+			final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor,
+			final ComponentMainThreadExecutor mainThreadExecutor) throws Exception {
+
+		final OperatorID opId = new OperatorID();
+		final OperatorCoordinator.Provider provider = new OperatorCoordinator.Provider() {
+			@Override
+			public OperatorID getOperatorId() {
+				return opId;
+			}
+
+			@Override
+			public OperatorCoordinator create(OperatorCoordinator.Context context) {
+				return coordinatorCtor.apply(context);
+			}
+		};
+
+		final OperatorCoordinatorHolder holder = OperatorCoordinatorHolder.create(
+				opId,
+				provider,
+				eventSender,
+				"test-coordinator-name",
+				3,
+				1775);
+
+		holder.lazyInitialize(globalFailureHandler, mainThreadExecutor);
+		holder.start();
+
+		return holder;
+	}
+
+	// ------------------------------------------------------------------------
+	//   test implementations
+	// ------------------------------------------------------------------------
+
+	private static final class FutureCompletedInstantlyTestCoordinator extends CheckpointEventOrderTestBaseCoordinator {
+
+		private final ReentrantLock lock = new ReentrantLock(true);
+		private final Condition condition = lock.newCondition();
+
+		@Nullable
+		@GuardedBy("lock")
+		private CompletableFuture<byte[]> checkpoint;
+
+		private int num;
+
+		FutureCompletedInstantlyTestCoordinator(Context context) {
+			super(context);
+		}
+
+		@Override
+		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+			// we create the checkpoint future, but before returning it, we wait on a
+			// condition. that way, we simulate a "context switch" just at the time when the
+			// future would be returned and make the other thread complete the future and send an
+			// event before this method returns
+			final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+			lock.lock();
+			try {
+				checkpoint = checkpointFuture;
+				condition.await();
+				return checkpointFuture;
+			} finally {
+				lock.unlock();
+			}
+		}
+
+		@Override
+		protected void step() throws Exception {
+			lock.lock();
+			try {
+				// if there is a checkpoint to complete, we complete it and immediately
+				// try to send another event, without releasing the lock. that way we
+				// force the situation as if the checkpoint get completed and an event gets
+				// sent while the triggering thread is stalled
+				if (checkpoint != null) {
+					checkpoint.complete(intToBytes(num));
+					checkpoint = null;
+				}
+				context.sendEvent(new TestOperatorEvent(num++), 0);
+				condition.signalAll();
+			} finally {
+				lock.unlock();
+			}
+
+			Thread.sleep(2);
+		}
+	}
+
+	private static final class FutureCompletedAfterSendingEventsCoordinator extends CheckpointEventOrderTestBaseCoordinator {
+
+		@Nullable
+		private CompletableFuture<byte[]> checkpoint;
+
+		private int num;
+
+		FutureCompletedAfterSendingEventsCoordinator(Context context) {
+			super(context);
+		}
+
+		@Override
+		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+			final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+			checkpoint = checkpointFuture;
+			return checkpointFuture;
+		}
+
+		@Override
+		protected void step() throws Exception {
+			Thread.sleep(2);
+
+			context.sendEvent(new TestOperatorEvent(num++), 0);
+			context.sendEvent(new TestOperatorEvent(num++), 1);
+			context.sendEvent(new TestOperatorEvent(num++), 2);
+
+			if (checkpoint != null) {
+				checkpoint.complete(intToBytes(num));
+				checkpoint = null;
+			}
+		}
+	}
+
+	private abstract static class CheckpointEventOrderTestBaseCoordinator implements OperatorCoordinator, Runnable {
+
+		private final Thread coordinatorThread;
+
+		protected final Context context;
+
+		private volatile boolean closed;
+
+		CheckpointEventOrderTestBaseCoordinator(Context context) {
+			this.context = context;
+			this.coordinatorThread = new Thread(this);
+		}
+
+		@Override
+		public void start() throws Exception {
+			coordinatorThread.start();
+		}
+
+		@Override
+		public void close() throws Exception {
+			closed = true;
+			coordinatorThread.interrupt();
+			coordinatorThread.join();
+		}
+
+		@Override
+		public void handleEventFromOperator(int subtask, OperatorEvent event){}
+
+		@Override
+		public void subtaskFailed(int subtask, @Nullable Throwable reason) {}
+
+		@Override
+		public abstract CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
+
+		@Override
+		public void checkpointComplete(long checkpointId) {}
+
+		@Override
+		public void resetToCheckpoint(byte[] checkpointData) throws Exception {}
+
+		@Override
+		public void run() {
+			try {
+				while (!closed) {
+					step();
+				}
+			} catch (Throwable t) {
+				if (closed) {
+					return;
+				}
+
+				// this should never happen, but just in case, print and crash the test
+				//noinspection CallToPrintStackTrace
+				t.printStackTrace();
+				System.exit(-1);
+			}
+		}
+
+		protected abstract void step() throws Exception;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
index 9b0325d..a380958 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
@@ -50,11 +50,30 @@ public class OperatorEventValveTest {
 		assertTrue(future.isDone());
 	}
 
+	@Test(expected = IllegalStateException.class)
+	public void errorShuttingUnmarkedValve() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+
+		valve.shutValve(123L);
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void errorShuttingValveForOtherMark() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+
+		valve.markForCheckpoint(100L);
+		valve.shutValve(123L);
+	}
+
 	@Test
 	public void eventsBlockedByClosedValve() throws Exception {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorEventValve valve = new OperatorEventValve(sender);
-		valve.shutValve();
+
+		valve.markForCheckpoint(1L);
+		valve.shutValve(1L);
 
 		final CompletableFuture<Acknowledge> future =
 				valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1);
@@ -67,14 +86,16 @@ public class OperatorEventValveTest {
 	public void eventsReleasedAfterOpeningValve() throws Exception {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorEventValve valve = new OperatorEventValve(sender);
-		valve.shutValve();
+
+		valve.markForCheckpoint(17L);
+		valve.shutValve(17L);
 
 		final OperatorEvent event1 = new TestOperatorEvent();
 		final OperatorEvent event2 = new TestOperatorEvent();
 		final CompletableFuture<Acknowledge> future1 = valve.sendEvent(new SerializedValue<>(event1), 3);
 		final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 0);
 
-		valve.openValve();
+		valve.openValveAndUnmarkCheckpoint();
 
 		assertThat(sender.events, containsInAnyOrder(
 			new EventWithSubtask(event1, 3),
@@ -88,11 +109,13 @@ public class OperatorEventValveTest {
 	public void releasedEventsForwardSendFailures() throws Exception {
 		final TestEventSender sender = new TestEventSender(new FlinkException("test"));
 		final OperatorEventValve valve = new OperatorEventValve(sender);
-		valve.shutValve();
+
+		valve.markForCheckpoint(17L);
+		valve.shutValve(17L);
 
 		final CompletableFuture<Acknowledge> future =
 				valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 10);
-		valve.openValve();
+		valve.openValveAndUnmarkCheckpoint();
 
 		assertTrue(future.isCompletedExceptionally());
 	}
@@ -101,13 +124,14 @@ public class OperatorEventValveTest {
 	public void resetDropsAllEvents() throws Exception {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorEventValve valve = new OperatorEventValve(sender);
-		valve.shutValve();
+		valve.markForCheckpoint(17L);
+		valve.shutValve(17L);
 
 		valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 0);
 		valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1);
 
 		valve.reset();
-		valve.openValve();
+		valve.openValveAndUnmarkCheckpoint();
 
 		assertTrue(sender.events.isEmpty());
 	}
@@ -116,7 +140,8 @@ public class OperatorEventValveTest {
 	public void resetForTaskDropsSelectiveEvents() throws Exception {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorEventValve valve = new OperatorEventValve(sender);
-		valve.shutValve();
+		valve.markForCheckpoint(17L);
+		valve.shutValve(17L);
 
 		final OperatorEvent event1 = new TestOperatorEvent();
 		final OperatorEvent event2 = new TestOperatorEvent();
@@ -124,7 +149,7 @@ public class OperatorEventValveTest {
 		final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 1);
 
 		valve.resetForTask(1);
-		valve.openValve();
+		valve.openValveAndUnmarkCheckpoint();
 
 		assertThat(sender.events, contains(new EventWithSubtask(event1, 0)));
 		assertTrue(future1.isDone());
@@ -136,7 +161,8 @@ public class OperatorEventValveTest {
 		final TestEventSender sender = new TestEventSender();
 		final OperatorEventValve valve = new OperatorEventValve(sender);
 
-		valve.shutValve();
+		valve.markForCheckpoint(17L);
+		valve.shutValve(17L);
 		valve.reset();
 
 		assertFalse(valve.isShut());