You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/30 21:15:07 UTC

[flink] 04/11: [FLINK-16986][coordination] (part 1) Provide exactly-once guarantees around checkpoints and operator event sending.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a721d855998a7c91415312d4890d7d4f916e163
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon May 18 01:46:32 2020 +0200

    [FLINK-16986][coordination] (part 1) Provide exactly-once guarantees around checkpoints and operator event sending.
    
    This closes #12234
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |   8 +-
 .../OperatorCoordinatorCheckpointContext.java      |  50 +-
 .../checkpoint/OperatorCoordinatorCheckpoints.java |  12 +-
 .../runtime/executiongraph/ExecutionGraph.java     |   9 +-
 .../coordination/OperatorCoordinatorHolder.java    | 109 ++++-
 .../operators/coordination/OperatorEventValve.java | 209 +++++++++
 .../flink/runtime/scheduler/SchedulerBase.java     |   4 +-
 .../runtime/checkpoint/PendingCheckpointTest.java  |   8 +-
 .../CoordinatorEventsExactlyOnceITCase.java        | 501 +++++++++++++++++++++
 .../MockOperatorCoordinatorCheckpointContext.java} |  60 +--
 .../OperatorCoordinatorSchedulerTest.java          |   4 +-
 .../coordination/OperatorEventValveTest.java       | 144 ++++++
 .../operators/coordination/TestEventSender.java    | 113 +++++
 .../operators/coordination/TestOperatorEvent.java  |  33 ++
 14 files changed, 1175 insertions(+), 89 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b283a2a..51b98f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -545,13 +545,17 @@ public class CheckpointCoordinator {
 
 						if (throwable == null && checkpoint != null && !checkpoint.isDiscarded()) {
 							// no exception, no discarding, everything is OK
+							final long checkpointId = checkpoint.getCheckpointId();
 							snapshotTaskState(
 								timestamp,
-								checkpoint.getCheckpointId(),
+								checkpointId,
 								checkpoint.getCheckpointStorageLocation(),
 								request.props,
 								executions,
 								request.advanceToEndOfTime);
+
+							coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId));
+
 							onTriggerSuccess();
 						} else {
 								// the initialization might not be finished yet
@@ -777,6 +781,8 @@ public class CheckpointCoordinator {
 		throwable = ExceptionUtils.stripCompletionException(throwable);
 
 		try {
+			coordinatorsToCheckpoint.forEach(OperatorCoordinatorCheckpointContext::abortCurrentTriggering);
+
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
 				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
 				LOG.warn(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
index 16acbb2..92fd2aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -24,51 +24,41 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import java.util.Collection;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * An {@link OperatorCoordinator} and its contextual information needed to trigger and
  * acknowledge a checkpoint.
  */
-public final class OperatorCoordinatorCheckpointContext {
+public interface OperatorCoordinatorCheckpointContext {
 
-	private final OperatorCoordinator coordinator;
+	// ------------------------------------------------------------------------
+	//  properties
+	// ------------------------------------------------------------------------
 
-	private final OperatorID operatorId;
+	OperatorCoordinator coordinator();
 
-	private final int maxParallelism;
+	OperatorID operatorId();
 
-	private final int currentParallelism;
+	int maxParallelism();
 
-	public OperatorCoordinatorCheckpointContext(
-			OperatorCoordinator coordinator,
-			OperatorID operatorId,
-			int maxParallelism,
-			int currentParallelism) {
+	int currentParallelism();
 
-		this.coordinator = checkNotNull(coordinator);
-		this.operatorId = checkNotNull(operatorId);
-		this.maxParallelism = maxParallelism;
-		this.currentParallelism = currentParallelism;
-	}
+	// ------------------------------------------------------------------------
+	//  checkpoint triggering callbacks
+	// ------------------------------------------------------------------------
 
-	public OperatorCoordinator coordinator() {
-		return coordinator;
-	}
+	void onCallTriggerCheckpoint(long checkpointId);
 
-	public OperatorID operatorId() {
-		return operatorId;
-	}
+	void onCheckpointStateFutureComplete(long checkpointId);
 
-	public int maxParallelism() {
-		return maxParallelism;
-	}
+	void afterSourceBarrierInjection(long checkpointId);
 
-	public int currentParallelism() {
-		return currentParallelism;
-	}
+	void abortCurrentTriggering();
+
+	// ------------------------------------------------------------------------
+	//  utils
+	// ------------------------------------------------------------------------
 
-	public static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
+	static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
 		return infos.stream()
 			.map(OperatorCoordinatorCheckpointContext::operatorId)
 			.collect(Collectors.toList());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
index 39ac10f..68fc8f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -58,7 +58,17 @@ final class OperatorCoordinatorCheckpoints {
 		final Collection<CompletableFuture<CoordinatorSnapshot>> individualSnapshots = new ArrayList<>(coordinators.size());
 
 		for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) {
-			individualSnapshots.add(triggerCoordinatorCheckpoint(coordinator, checkpointId));
+			final CompletableFuture<CoordinatorSnapshot> checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId);
+			coordinator.onCallTriggerCheckpoint(checkpointId);
+
+			individualSnapshots.add(checkpointFuture);
+			checkpointFuture.whenComplete((ignored, failure) -> {
+				if (failure != null) {
+					coordinator.abortCurrentTriggering();
+				} else {
+					coordinator.onCheckpointStateFutureComplete(checkpointId);
+				}
+			});
 		}
 
 		return FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index b84a86f..dfd20ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -64,7 +64,6 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.scheduler.InternalFailuresListener;
 import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
@@ -569,13 +568,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	private Collection<OperatorCoordinatorCheckpointContext> buildOpCoordinatorCheckpointContexts() {
 		final ArrayList<OperatorCoordinatorCheckpointContext> contexts = new ArrayList<>();
 		for (final ExecutionJobVertex vertex : verticesInCreationOrder) {
-			for (final OperatorCoordinatorHolder coordinator : vertex.getOperatorCoordinators()) {
-				contexts.add(new OperatorCoordinatorCheckpointContext(
-						coordinator,
-						coordinator.getOperatorId(),
-						vertex.getMaxParallelism(),
-						vertex.getParallelism()));
-			}
+			contexts.addAll(vertex.getOperatorCoordinators());
 		}
 		contexts.trimToSize();
 		return contexts;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 43c47fe..c70e229 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -33,6 +34,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -51,34 +53,99 @@ import static org.apache.flink.util.Preconditions.checkState;
  * However, the real Coordinators can only be created after SchedulerNG was created, because they need
  * a reference to it for the failure calls.
  */
-public class OperatorCoordinatorHolder implements OperatorCoordinator {
+public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorCoordinatorCheckpointContext {
+
+	private static final long NO_CHECKPOINT = Long.MIN_VALUE;
 
 	private final OperatorCoordinator coordinator;
 	private final OperatorID operatorId;
 	private final LazyInitializedCoordinatorContext context;
 
+	private final OperatorEventValve eventValve;
+
+	// these two fields are needed for the construction of OperatorStateHandles when taking checkpoints
+	private final int operatorParallelism;
+	private final int operatorMaxParallelism;
+
+	private long currentlyTriggeredCheckpoint;
+
 	private OperatorCoordinatorHolder(
 			final OperatorID operatorId,
 			final OperatorCoordinator coordinator,
-			final LazyInitializedCoordinatorContext context) {
+			final LazyInitializedCoordinatorContext context,
+			final OperatorEventValve eventValve,
+			final int operatorParallelism,
+			final int operatorMaxParallelism) {
 
 		this.operatorId = checkNotNull(operatorId);
 		this.coordinator = checkNotNull(coordinator);
 		this.context = checkNotNull(context);
+		this.eventValve = checkNotNull(eventValve);
+		this.operatorParallelism = operatorParallelism;
+		this.operatorMaxParallelism = operatorMaxParallelism;
+
+		this.currentlyTriggeredCheckpoint = NO_CHECKPOINT;
 	}
 
+	public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
+		context.lazyInitialize(scheduler, schedulerExecutor);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
 	// ------------------------------------------------------------------------
 
-	public OperatorID getOperatorId() {
+	@Override
+	public OperatorID operatorId() {
 		return operatorId;
 	}
 
-	public OperatorCoordinator getCoordinator() {
+	@Override
+	public OperatorCoordinator coordinator() {
 		return coordinator;
 	}
 
-	public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
-		context.lazyInitialize(scheduler, schedulerExecutor);
+	@Override
+	public int maxParallelism() {
+		return operatorMaxParallelism;
+	}
+
+	@Override
+	public int currentParallelism() {
+		return operatorParallelism;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Checkpointing Callbacks
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void onCallTriggerCheckpoint(long checkpointId) {
+		checkCheckpointAlreadyHappening(checkpointId);
+		currentlyTriggeredCheckpoint = checkpointId;
+	}
+
+	@Override
+	public void onCheckpointStateFutureComplete(long checkpointId) {
+		checkCheckpointAlreadyHappening(checkpointId);
+		eventValve.shutValve();
+	}
+
+	@Override
+	public void afterSourceBarrierInjection(long checkpointId) {
+		checkCheckpointAlreadyHappening(checkpointId);
+		eventValve.openValve();
+		currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+	}
+
+	@Override
+	public void abortCurrentTriggering() {
+		eventValve.openValve();
+		currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+	}
+
+	private void checkCheckpointAlreadyHappening(long checkpointId) {
+		checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId);
 	}
 
 	// ------------------------------------------------------------------------
@@ -105,6 +172,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator {
 	@Override
 	public void subtaskFailed(int subtask, @Nullable Throwable reason) {
 		coordinator.subtaskFailed(subtask, reason);
+		eventValve.resetForTask(subtask);
 	}
 
 	@Override
@@ -134,9 +202,24 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator {
 		try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
 			final OperatorCoordinator.Provider provider = serializedProvider.deserializeValue(classLoader);
 			final OperatorID opId = provider.getOperatorId();
-			final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, jobVertex);
+
+			final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender =
+				(serializedEvent, subtask) -> {
+					final Execution executionAttempt = jobVertex.getTaskVertices()[subtask].getCurrentExecutionAttempt();
+					return executionAttempt.sendOperatorEvent(opId, serializedEvent);
+				};
+
+			final OperatorEventValve valve = new OperatorEventValve(eventSender);
+			final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, jobVertex, valve);
 			final OperatorCoordinator coordinator = provider.create(context);
-			return new OperatorCoordinatorHolder(opId, coordinator, context);
+
+			return new OperatorCoordinatorHolder(
+					opId,
+					coordinator,
+					context,
+					valve,
+					jobVertex.getParallelism(),
+					jobVertex.getMaxParallelism());
 		}
 	}
 
@@ -157,13 +240,18 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator {
 
 		private final OperatorID operatorId;
 		private final ExecutionJobVertex jobVertex;
+		private final OperatorEventValve eventValve;
 
 		private SchedulerNG scheduler;
 		private Executor schedulerExecutor;
 
-		public LazyInitializedCoordinatorContext(OperatorID operatorId, ExecutionJobVertex jobVertex) {
+		public LazyInitializedCoordinatorContext(
+				OperatorID operatorId,
+				ExecutionJobVertex jobVertex,
+				OperatorEventValve eventValve) {
 			this.operatorId = checkNotNull(operatorId);
 			this.jobVertex = checkNotNull(jobVertex);
+			this.eventValve = checkNotNull(eventValve);
 		}
 
 		void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
@@ -208,8 +296,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator {
 				throw new FlinkRuntimeException("Cannot serialize operator event", e);
 			}
 
-			final Execution executionAttempt = jobVertex.getTaskVertices()[targetSubtask].getCurrentExecutionAttempt();
-			return executionAttempt.sendOperatorEvent(operatorId, serializedEvent);
+			return eventValve.sendEvent(serializedEvent, targetSubtask);
 		}
 
 		@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
new file mode 100644
index 0000000..bc1bfcf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+
+/**
+ * The event value is the connection through which operator events are sent, from coordinator to
+ * operator.It can temporarily block events from going through, buffering them, and releasing them
+ * later.
+ *
+ * <p>The valve can also drop buffered events for all or selected targets.
+ *
+ * <p>This class is fully thread safe, under the assumption that the event sender is thread-safe.
+ */
+public final class OperatorEventValve {
+
+	private final Object lock = new Object();
+
+	@GuardedBy("lock")
+	private final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender;
+
+	@GuardedBy("lock")
+	private final Map<Integer, List<BlockedEvent>> blockedEvents = new LinkedHashMap<>();
+
+	@GuardedBy("lock")
+	private boolean shut;
+
+	/**
+	 * Constructs a new OperatorEventValve, passing the events to the given function when the valve is open
+	 * or opened again. The second parameter of the BiFunction is the target operator subtask index.
+	 */
+	public OperatorEventValve(BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender) {
+		this.eventSender = eventSender;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public boolean isShut() {
+		// synchronized block for visibility
+		synchronized (lock) {
+			return shut;
+		}
+	}
+
+	/**
+	 * Send the event directly, if the valve is open, and returns the original sending result future.
+	 *
+	 * <p>If the valve is closed this buffers the event and returns an incomplete future. The future is completed
+	 * with the original result once the valve is opened. If the event is never sent (because it gets dropped
+	 * through a call to {@link #reset()} or {@link #resetForTask(int)}, then the returned future till be
+	 * completed exceptionally.
+	 */
+	public CompletableFuture<Acknowledge> sendEvent(SerializedValue<OperatorEvent> event, int subtask) {
+		synchronized (lock) {
+			if (!shut) {
+				return eventSender.apply(event, subtask);
+			}
+
+			final List<BlockedEvent> eventsForTask = blockedEvents.computeIfAbsent(subtask, (key) -> new ArrayList<>());
+			final CompletableFuture<Acknowledge> future = new CompletableFuture<>();
+			eventsForTask.add(new BlockedEvent(event, subtask, future));
+			return future;
+		}
+	}
+
+	/**
+	 * Shuts the value. All events sent through this valve are blocked until the valve is re-opened.
+	 * If the valve is already shut, this does nothing.
+	 */
+	public void shutValve() {
+		// synchronized block for visibility
+		synchronized (lock) {
+			shut = true;
+		}
+	}
+
+	/**
+	 * Opens the value, releasing all buffered events.
+	 */
+	public void openValve() {
+		final ArrayList<FuturePair> futures;
+
+		// send all events under lock, so that no new event can sneak between
+		synchronized (lock) {
+			if (!shut) {
+				return;
+			}
+
+			futures = new ArrayList<>(blockedEvents.size());
+
+			for (List<BlockedEvent> eventsForTask : blockedEvents.values()) {
+				for (BlockedEvent blockedEvent : eventsForTask) {
+					final CompletableFuture<Acknowledge> ackFuture = eventSender.apply(blockedEvent.event, blockedEvent.subtask);
+					futures.add(new FuturePair(blockedEvent.future, ackFuture));
+				}
+			}
+			blockedEvents.clear();
+			shut = false;
+		}
+
+		// apply the logic on the future outside the lock, to be safe
+		for (FuturePair pair : futures) {
+			final CompletableFuture<Acknowledge> originalFuture = pair.originalFuture;
+			pair.ackFuture.whenComplete((success, failure) -> {
+				if (failure != null) {
+					originalFuture.completeExceptionally(failure);
+				} else {
+					originalFuture.complete(success);
+				}
+			});
+		}
+	}
+
+	/**
+	 * Drops all blocked events for a specific subtask.
+	 */
+	public void resetForTask(int subtask) {
+		final List<BlockedEvent> events;
+		synchronized (lock) {
+			events = blockedEvents.remove(subtask);
+		}
+
+		failAllFutures(events);
+	}
+
+	/**
+	 * Resets the valve, dropping all blocked events and opening the valve.
+	 */
+	public void reset() {
+		final List<BlockedEvent> events = new ArrayList<>();
+		synchronized (lock) {
+			for (List<BlockedEvent> taskEvents : blockedEvents.values()) {
+				if (taskEvents != null) {
+					events.addAll(taskEvents);
+				}
+			}
+			blockedEvents.clear();
+			shut = false;
+		}
+
+		failAllFutures(events);
+	}
+
+	private static void failAllFutures(@Nullable List<BlockedEvent> events) {
+		if (events == null || events.isEmpty()) {
+			return;
+		}
+
+		final Exception failureCause = new FlinkException("Event discarded due to failure of target task");
+		for (BlockedEvent evt : events) {
+			evt.future.completeExceptionally(failureCause);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class BlockedEvent {
+
+		final SerializedValue<OperatorEvent> event;
+		final CompletableFuture<Acknowledge> future;
+		final int subtask;
+
+		BlockedEvent(SerializedValue<OperatorEvent> event, int subtask, CompletableFuture<Acknowledge> future) {
+			this.event = event;
+			this.future = future;
+			this.subtask = subtask;
+		}
+	}
+
+	private static final class FuturePair {
+
+		final CompletableFuture<Acknowledge> originalFuture;
+		final CompletableFuture<Acknowledge> ackFuture;
+
+		FuturePair(CompletableFuture<Acknowledge> originalFuture, CompletableFuture<Acknowledge> ackFuture) {
+			this.originalFuture = originalFuture;
+			this.ackFuture = ackFuture;
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 6f2ae47..ce1d8a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -962,7 +962,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 			throw new FlinkException("Coordinator of operator " + operator + " does not exist");
 		}
 
-		final OperatorCoordinator coordinator = coordinatorHolder.getCoordinator();
+		final OperatorCoordinator coordinator = coordinatorHolder.coordinator();
 		if (coordinator instanceof CoordinationRequestHandler) {
 			return ((CoordinationRequestHandler) coordinator).handleCoordinationRequest(request);
 		} else {
@@ -1002,7 +1002,7 @@ public abstract class SchedulerBase implements SchedulerNG {
 		Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap = new HashMap<>();
 		for (ExecutionJobVertex vertex : executionGraph.getAllVertices().values()) {
 			for (OperatorCoordinatorHolder holder : vertex.getOperatorCoordinators()) {
-				coordinatorMap.put(holder.getOperatorId(), holder);
+				coordinatorMap.put(holder.operatorId(), holder);
 			}
 		}
 		return coordinatorMap;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index ee292b0..b454633 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.TestingStreamStateHandle;
@@ -563,11 +563,7 @@ public class PendingCheckpointTest {
 	}
 
 	private static OperatorCoordinatorCheckpointContext createOperatorCoordinator() {
-		return new OperatorCoordinatorCheckpointContext(
-				new MockOperatorCoordinator(),
-				new OperatorID(),
-				256,
-				50);
+		return new MockOperatorCoordinatorCheckpointContext();
 	}
 
 	@SuppressWarnings("unchecked")
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
new file mode 100644
index 0000000..919e6fe
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration Test case that validates the exactly-once mechanism for coordinator events
+ * around checkpoints.
+ *
+ * <p>The test provokes the corner cases of the mechanism described in {@link OperatorCoordinatorHolder}.
+ * <pre>
+ * Coordinator one events: => a . . b . |trigger| . . |complete| . . c . . d . |barrier| . e . f
+ * Coordinator two events: => . . x . . |trigger| . . . . . . . . . .|complete||barrier| . . y . . z
+ * </pre>
+ *
+ * <p>The test generates two sequences of events form two Operator Coordinators to two operators (tasks).
+ * The event sequences have a different speed in which they are sent. The coordinators have different
+ * delays in which they complete their checkpoints. Both coordinators inject failures at different points.
+ */
+@SuppressWarnings("serial")
+public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
+
+	private static final ConfigOption<String> ACC_NAME = ConfigOptions.key("acc").stringType().noDefaultValue();
+	private static final String OPERATOR_1_ACCUMULATOR = "op-acc-1";
+	private static final String OPERATOR_2_ACCUMULATOR = "op-acc-2";
+
+	private static MiniCluster miniCluster;
+
+	@BeforeClass
+	public static void startMiniCluster() throws Exception {
+		final Configuration config = new Configuration();
+		config.setString(RestOptions.BIND_PORT, "0");
+
+		final MiniClusterConfiguration clusterCfg = new MiniClusterConfiguration.Builder()
+			.setNumTaskManagers(2)
+			.setNumSlotsPerTaskManager(1)
+			.setConfiguration(config)
+			.build();
+
+		miniCluster = new MiniCluster(clusterCfg);
+		miniCluster.start();
+	}
+
+	@AfterClass
+	public static void shutdownMiniCluster() throws Exception {
+		miniCluster.close();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void test() throws Exception {
+		final int numEvents1 = 200;
+		final int numEvents2 = 5;
+		final int delay1 = 1;
+		final int delay2 = 200;
+
+		final JobVertex task1 = buildJobVertex("TASK_1", numEvents1, delay1, OPERATOR_1_ACCUMULATOR);
+		final JobVertex task2 = buildJobVertex("TASK_2", numEvents2, delay2, OPERATOR_2_ACCUMULATOR);
+		final JobGraph jobGraph = new JobGraph("Coordinator Events Job", task1, task2);
+		jobGraph.setSnapshotSettings(createCheckpointSettings(task1, task2));
+
+		final JobExecutionResult result = miniCluster.executeJobBlocking(jobGraph);
+
+		checkListContainsSequence(result.getAccumulatorResult(OPERATOR_2_ACCUMULATOR), numEvents2);
+		checkListContainsSequence(result.getAccumulatorResult(OPERATOR_1_ACCUMULATOR), numEvents1);
+	}
+
+	private static void checkListContainsSequence(List<Integer> ints, int length) {
+		if (ints.size() != length) {
+			failList(ints, length);
+		}
+
+		int nextExpected = 0;
+		for (int next : ints) {
+			if (next != nextExpected++) {
+				failList(ints, length);
+			}
+		}
+	}
+
+	private static void failList(List<Integer> ints, int length) {
+		fail("List did not contain expected sequence of " + length + " elements, but was: " + ints);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test setup helpers
+	// ------------------------------------------------------------------------
+
+	private static JobVertex buildJobVertex(String name, int numEvents, int delay, String accName) throws IOException {
+		final JobVertex vertex = new JobVertex(name);
+		final OperatorID opId = OperatorID.fromJobVertexID(vertex.getID());
+
+		vertex.setParallelism(1);
+		vertex.setInvokableClass(EventCollectingTask.class);
+		vertex.getConfiguration().setString(ACC_NAME, accName);
+
+		final OperatorCoordinator.Provider provider = new OperatorCoordinator.Provider() {
+
+			@Override
+			public OperatorID getOperatorId() {
+				return opId;
+			}
+
+			@Override
+			public OperatorCoordinator create(OperatorCoordinator.Context context) {
+				return new EventSendingCoordinator(context, numEvents, delay);
+			}
+		};
+
+		vertex.addOperatorCoordinator(new SerializedValue<>(provider));
+
+		return vertex;
+	}
+
+	private static JobCheckpointingSettings createCheckpointSettings(JobVertex... vertices) {
+		final List<JobVertexID> ids = Arrays.stream(vertices)
+				.map(JobVertex::getID)
+				.collect(Collectors.toList());
+
+		final CheckpointCoordinatorConfiguration coordCfg =
+			new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder()
+				.setMaxConcurrentCheckpoints(1)
+				.setCheckpointInterval(10)
+				.setCheckpointTimeout(100_000)
+				.build();
+
+		return new JobCheckpointingSettings(ids, ids, ids, coordCfg, null);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test operator and coordinator implementations
+	// ------------------------------------------------------------------------
+
+	private static final class StartEvent implements OperatorEvent {}
+
+	private static final class EndEvent implements OperatorEvent {}
+
+	private static final class IntegerEvent implements OperatorEvent {
+
+		final int value;
+
+		IntegerEvent(int value) {
+			this.value = value;
+		}
+
+		@Override
+		public String toString() {
+			return "IntegerEvent " + value;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class EventSendingCoordinator implements OperatorCoordinator, Runnable {
+
+		private final Context context;
+
+		private ScheduledExecutorService executor;
+		private volatile ScheduledFuture<?> periodicTask;
+
+		private final int delay;
+		private final int maxNumber;
+		private int nextNumber;
+
+		private volatile CompletableFuture<byte[]> requestedCheckpoint;
+		private CompletableFuture<byte[]> nextToComplete;
+
+		private final int failAtMessage;
+		private boolean failedBefore;
+
+		private EventSendingCoordinator(Context context, int numEvents, int delay) {
+			checkArgument(delay > 0);
+			checkArgument(numEvents >= 3);
+
+			this.context = context;
+			this.maxNumber = numEvents;
+			this.delay = delay;
+			this.executor = Executors.newSingleThreadScheduledExecutor();
+
+			this.failAtMessage = numEvents / 3 + new Random().nextInt(numEvents / 3);
+		}
+
+		@Override
+		public void start() throws Exception {}
+
+		@Override
+		public void close() throws Exception {
+			executor.shutdownNow();
+			executor.awaitTermination(10, TimeUnit.SECONDS);
+		}
+
+		@Override
+		public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+			if (subtask != 0 || !(event instanceof StartEvent)) {
+				throw new Exception(String.format("Don't recognize event '%s' from task %d.", event, subtask));
+			}
+
+			if (periodicTask != null) {
+				throw new Exception("periodic already running");
+			}
+			periodicTask = executor.scheduleWithFixedDelay(this, delay, delay, TimeUnit.MILLISECONDS);
+		}
+
+		@Override
+		public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+			periodicTask.cancel(false);
+			periodicTask = null;
+			executor.execute(() -> nextNumber = 0);
+		}
+
+		@Override
+		public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+			executor.execute(() -> nextNumber = bytesToInt(checkpointData));
+		}
+
+		@Override
+		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+			final CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>();
+			requestedCheckpoint = checkpointFuture;
+			return checkpointFuture;
+		}
+
+		@Override
+		public void checkpointComplete(long checkpointId) {}
+
+		@SuppressWarnings("CallToPrintStackTrace")
+		@Override
+		public void run() {
+			try {
+				handleCheckpoint();
+				sendNextEvent();
+				checkWhetherToTriggerFailure();
+			} catch (Throwable t) {
+				// this is so that exceptions thrown in the scheduled executor don't just freeze the test
+				t.printStackTrace();
+				System.exit(-1);
+			}
+		}
+
+		private void handleCheckpoint() {
+			// we move the checkpoint one further so it completed after the next delay
+			if (nextToComplete != null) {
+				final int numToCheckpoint = Math.min(nextNumber, maxNumber);
+				nextToComplete.complete(intToBytes(numToCheckpoint));
+				nextToComplete = null;
+			}
+			if (requestedCheckpoint != null) {
+				nextToComplete = requestedCheckpoint;
+				requestedCheckpoint = null;
+			}
+		}
+
+		private void sendNextEvent() {
+			if (nextNumber > maxNumber) {
+				return;
+			}
+			try {
+				if (nextNumber == maxNumber) {
+					context.sendEvent(new EndEvent(), 0);
+				} else {
+					context.sendEvent(new IntegerEvent(nextNumber), 0);
+				}
+				nextNumber++;
+			} catch (TaskNotRunningException ignored) {}
+		}
+
+		private void checkWhetherToTriggerFailure() {
+			if (nextNumber >= failAtMessage && !failedBefore) {
+				failedBefore = true;
+				context.failJob(new Exception("test failure"));
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The runtime task that receives the events and accumulates the numbers. The task is stateful
+	 * and checkpoints the accumulator.
+	 */
+	public static final class EventCollectingTask extends AbstractInvokable {
+
+		private final OperatorID operatorID;
+		private final String accumulatorName;
+		private final LinkedBlockingQueue<Object> actions;
+
+		private volatile boolean running = true;
+
+		public EventCollectingTask(Environment environment) {
+			super(environment);
+			this.operatorID = OperatorID.fromJobVertexID(environment.getJobVertexId());
+			this.accumulatorName = environment.getTaskConfiguration().get(ACC_NAME);
+			this.actions = new LinkedBlockingQueue<>();
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			final ArrayList<Integer> collectedInts = new ArrayList<>();
+			restoreState(collectedInts);
+
+			// signal the coordinator to start
+			getEnvironment().getOperatorCoordinatorEventGateway()
+					.sendOperatorEventToCoordinator(operatorID, new SerializedValue<>(new StartEvent()));
+
+			// poor-man's mailbox
+			Object next;
+			while (running && !((next = actions.take()) instanceof EndEvent)) {
+				if (next instanceof IntegerEvent) {
+					collectedInts.add(((IntegerEvent) next).value);
+				} else if (next instanceof CheckpointMetaData) {
+					takeCheckpoint(((CheckpointMetaData) next).getCheckpointId(), collectedInts);
+				} else {
+					throw new Exception("Unrecognized: " + next);
+				}
+			}
+
+			if (running) {
+				final ListAccumulator<Integer> acc = new ListAccumulator<>();
+				collectedInts.forEach(acc::add);
+				getEnvironment().getAccumulatorRegistry().getUserMap().put(accumulatorName, acc);
+			}
+		}
+
+		@Override
+		public void cancel() throws Exception {
+			running = false;
+		}
+
+		@Override
+		public Future<Boolean> triggerCheckpointAsync(
+				CheckpointMetaData checkpointMetaData,
+				CheckpointOptions checkpointOptions,
+				boolean advanceToEndOfEventTime) {
+			actions.add(checkpointMetaData); // this signals the main thread should do a checkpoint
+			return CompletableFuture.completedFuture(true);
+		}
+
+		@Override
+		public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
+			return CompletableFuture.completedFuture(null);
+		}
+
+		@Override
+		public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
+			return CompletableFuture.completedFuture(null);
+		}
+
+		@Override
+		public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
+			try {
+				final OperatorEvent opEvent = event.deserializeValue(getUserCodeClassLoader());
+				actions.add(opEvent);
+			} catch (IOException | ClassNotFoundException e) {
+				throw new FlinkException(e);
+			}
+		}
+
+		private void takeCheckpoint(long checkpointId, List<Integer> state) throws Exception {
+			final StreamStateHandle handle = stateToHandle(state);
+			final TaskStateSnapshot snapshot = createSnapshot(handle, operatorID);
+			getEnvironment().acknowledgeCheckpoint(checkpointId, new CheckpointMetrics(), snapshot);
+		}
+
+		private void restoreState(List<Integer> target) throws Exception {
+			final StreamStateHandle stateHandle = readSnapshot(getEnvironment().getTaskStateManager(), operatorID);
+			if (stateHandle != null) {
+				final List<Integer> list = handleToState(stateHandle);
+				target.addAll(list);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  serialization shenannigans
+	// ------------------------------------------------------------------------
+
+	static byte[] intToBytes(int value) {
+		final byte[] bytes = new byte[4];
+		ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).putInt(0, value);
+		return bytes;
+	}
+
+	static int bytesToInt(byte[] bytes) {
+		assertEquals(4, bytes.length);
+		return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getInt(0);
+	}
+
+	static ByteStreamStateHandle stateToHandle(List<Integer> state) throws IOException {
+		final byte[] bytes = InstantiationUtil.serializeObject(state);
+		return new ByteStreamStateHandle("state", bytes);
+	}
+
+	static List<Integer> handleToState(StreamStateHandle handle) throws IOException, ClassNotFoundException {
+		final ByteStreamStateHandle byteHandle = (ByteStreamStateHandle) handle;
+		return InstantiationUtil.deserializeObject(byteHandle.getData(), EventCollectingTask.class.getClassLoader());
+	}
+
+	static TaskStateSnapshot createSnapshot(StreamStateHandle handle, OperatorID operatorId) {
+		final OperatorStateHandle.StateMetaInfo metaInfo = new OperatorStateHandle.StateMetaInfo(
+			new long[]{0}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
+
+		final OperatorStateHandle state = new OperatorStreamStateHandle(
+			Collections.singletonMap("état_et_moi_:_ça_fait_deux", metaInfo), handle);
+
+		final OperatorSubtaskState oss = new OperatorSubtaskState(
+			StateObjectCollection.singleton(state), StateObjectCollection.empty(),
+			StateObjectCollection.empty(), StateObjectCollection.empty());
+		return new TaskStateSnapshot(Collections.singletonMap(operatorId, oss));
+	}
+
+	@Nullable
+	static StreamStateHandle readSnapshot(TaskStateManager stateManager, OperatorID operatorId) {
+		final PrioritizedOperatorSubtaskState poss = stateManager.prioritizedOperatorState(operatorId);
+		if (!poss.isRestored()) {
+			return null;
+		}
+
+		final StateObjectCollection<OperatorStateHandle> opState =
+			stateManager.prioritizedOperatorState(operatorId).getPrioritizedManagedOperatorState().get(0);
+		final OperatorStateHandle handle = Iterators.getOnlyElement(opState.iterator());
+		return handle.getDelegateStateHandle();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
similarity index 56%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
copy to flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
index 16acbb2..7cab82f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
@@ -16,61 +16,65 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.checkpoint;
+package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-
-import java.util.Collection;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * An {@link OperatorCoordinator} and its contextual information needed to trigger and
- * acknowledge a checkpoint.
+ * A testing mock implementation of the {@link OperatorCoordinatorCheckpointContext}.
  */
-public final class OperatorCoordinatorCheckpointContext {
-
-	private final OperatorCoordinator coordinator;
+public class MockOperatorCoordinatorCheckpointContext implements OperatorCoordinatorCheckpointContext {
 
 	private final OperatorID operatorId;
-
+	private final OperatorCoordinator coordinator;
+	private final int parallelism;
 	private final int maxParallelism;
 
-	private final int currentParallelism;
+	public MockOperatorCoordinatorCheckpointContext() {
+		this(new OperatorID(), new MockOperatorCoordinator(), 50, 256);
+	}
 
-	public OperatorCoordinatorCheckpointContext(
-			OperatorCoordinator coordinator,
+	public MockOperatorCoordinatorCheckpointContext(
 			OperatorID operatorId,
-			int maxParallelism,
-			int currentParallelism) {
-
-		this.coordinator = checkNotNull(coordinator);
-		this.operatorId = checkNotNull(operatorId);
+			OperatorCoordinator coordinator,
+			int parallelism,
+			int maxParallelism) {
+		this.operatorId = operatorId;
+		this.coordinator = coordinator;
+		this.parallelism = parallelism;
 		this.maxParallelism = maxParallelism;
-		this.currentParallelism = currentParallelism;
 	}
 
+	@Override
 	public OperatorCoordinator coordinator() {
 		return coordinator;
 	}
 
+	@Override
 	public OperatorID operatorId() {
 		return operatorId;
 	}
 
+	@Override
 	public int maxParallelism() {
 		return maxParallelism;
 	}
 
+	@Override
 	public int currentParallelism() {
-		return currentParallelism;
+		return parallelism;
 	}
 
-	public static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
-		return infos.stream()
-			.map(OperatorCoordinatorCheckpointContext::operatorId)
-			.collect(Collectors.toList());
-	}
+	@Override
+	public void onCallTriggerCheckpoint(long checkpointId) {}
+
+	@Override
+	public void onCheckpointStateFutureComplete(long checkpointId) {}
+
+	@Override
+	public void afterSourceBarrierInjection(long checkpointId) {}
+
+	@Override
+	public void abortCurrentTriggering() {}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 14a136d..8e5404a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -505,11 +505,11 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		final Optional<OperatorCoordinatorHolder> coordinatorOptional = vertexWithCoordinator
 				.getOperatorCoordinators()
 				.stream()
-				.filter((holder) -> holder.getOperatorId().equals(testOperatorId))
+				.filter((holder) -> holder.operatorId().equals(testOperatorId))
 				.findFirst();
 		assertTrue("vertex does not contain coordinator", coordinatorOptional.isPresent());
 
-		final OperatorCoordinator coordinator = coordinatorOptional.get().getCoordinator();
+		final OperatorCoordinator coordinator = coordinatorOptional.get().coordinator();
 		assertThat(coordinator, instanceOf(TestingOperatorCoordinator.class));
 
 		return (TestingOperatorCoordinator) coordinator;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
new file mode 100644
index 0000000..9b0325d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventValveTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.TestEventSender.EventWithSubtask;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link OperatorEventValve}.
+ */
+public class OperatorEventValveTest {
+
+	@Test
+	public void eventsPassThroughOpenValve() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+
+		final OperatorEvent event = new TestOperatorEvent();
+		final CompletableFuture<Acknowledge> future = valve.sendEvent(new SerializedValue<>(event), 11);
+
+		assertThat(sender.events, contains(new EventWithSubtask(event, 11)));
+		assertTrue(future.isDone());
+	}
+
+	@Test
+	public void eventsBlockedByClosedValve() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+		valve.shutValve();
+
+		final CompletableFuture<Acknowledge> future =
+				valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1);
+
+		assertTrue(sender.events.isEmpty());
+		assertFalse(future.isDone());
+	}
+
+	@Test
+	public void eventsReleasedAfterOpeningValve() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+		valve.shutValve();
+
+		final OperatorEvent event1 = new TestOperatorEvent();
+		final OperatorEvent event2 = new TestOperatorEvent();
+		final CompletableFuture<Acknowledge> future1 = valve.sendEvent(new SerializedValue<>(event1), 3);
+		final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 0);
+
+		valve.openValve();
+
+		assertThat(sender.events, containsInAnyOrder(
+			new EventWithSubtask(event1, 3),
+			new EventWithSubtask(event2, 0)
+		));
+		assertTrue(future1.isDone());
+		assertTrue(future2.isDone());
+	}
+
+	@Test
+	public void releasedEventsForwardSendFailures() throws Exception {
+		final TestEventSender sender = new TestEventSender(new FlinkException("test"));
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+		valve.shutValve();
+
+		final CompletableFuture<Acknowledge> future =
+				valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 10);
+		valve.openValve();
+
+		assertTrue(future.isCompletedExceptionally());
+	}
+
+	@Test
+	public void resetDropsAllEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+		valve.shutValve();
+
+		valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 0);
+		valve.sendEvent(new SerializedValue<>(new TestOperatorEvent()), 1);
+
+		valve.reset();
+		valve.openValve();
+
+		assertTrue(sender.events.isEmpty());
+	}
+
+	@Test
+	public void resetForTaskDropsSelectiveEvents() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+		valve.shutValve();
+
+		final OperatorEvent event1 = new TestOperatorEvent();
+		final OperatorEvent event2 = new TestOperatorEvent();
+		final CompletableFuture<Acknowledge> future1 = valve.sendEvent(new SerializedValue<>(event1), 0);
+		final CompletableFuture<Acknowledge> future2 = valve.sendEvent(new SerializedValue<>(event2), 1);
+
+		valve.resetForTask(1);
+		valve.openValve();
+
+		assertThat(sender.events, contains(new EventWithSubtask(event1, 0)));
+		assertTrue(future1.isDone());
+		assertTrue(future2.isCompletedExceptionally());
+	}
+
+	@Test
+	public void resetOpensValve() throws Exception {
+		final TestEventSender sender = new TestEventSender();
+		final OperatorEventValve valve = new OperatorEventValve(sender);
+
+		valve.shutValve();
+		valve.reset();
+
+		assertFalse(valve.isShut());
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestEventSender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestEventSender.java
new file mode 100644
index 0000000..8b93dea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestEventSender.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+
+/**
+ * A test implementation of the BiFunction interface used as the underlying event sender in the
+ * {@link OperatorCoordinatorHolder}.
+ */
+final class TestEventSender
+		implements BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> {
+
+	final ArrayList<EventWithSubtask> events = new ArrayList<>();
+
+	@Nullable
+	private final Throwable failureCause;
+
+	/**
+	 * Creates a sender that collects events and acknowledges all events successfully.
+	 */
+	TestEventSender() {
+		this(null);
+	}
+
+	/**
+	 * Creates a sender that collects events and fails all the send-futures with the given exception,
+	 * if it is non-null.
+	 */
+	TestEventSender(@Nullable Throwable failureCause) {
+		this.failureCause = failureCause;
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> apply(SerializedValue<OperatorEvent> event, Integer subtask) {
+		final OperatorEvent deserializedEvent;
+		try {
+			deserializedEvent = event.deserializeValue(getClass().getClassLoader());
+		} catch (IOException | ClassNotFoundException e) {
+			throw new AssertionError(e);
+		}
+		events.add(new EventWithSubtask(deserializedEvent, subtask));
+
+		return failureCause == null
+				? CompletableFuture.completedFuture(Acknowledge.get())
+				: FutureUtils.completedExceptionally(failureCause);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A combination of an {@link OperatorEvent} and the target subtask it is sent to.
+	 */
+	static final class EventWithSubtask {
+
+		final OperatorEvent event;
+		final int subtask;
+
+		EventWithSubtask(OperatorEvent event, int subtask) {
+			this.event = event;
+			this.subtask = subtask;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			final EventWithSubtask that = (EventWithSubtask) o;
+			return subtask == that.subtask &&
+				event.equals(that.event);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(event, subtask);
+		}
+
+		@Override
+		public String toString() {
+			return event + " => subtask " + subtask;
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
index 6eb600b..8622143 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
@@ -23,4 +23,37 @@ package org.apache.flink.runtime.operators.coordination;
  */
 public final class TestOperatorEvent implements OperatorEvent {
 	private static final long serialVersionUID = 1L;
+
+	private final int value;
+
+	public TestOperatorEvent() {
+		// pick some random and rather unique value
+		this.value = System.identityHashCode(this);
+	}
+
+	public TestOperatorEvent(int value) {
+		this.value = value;
+	}
+
+	public int getValue() {
+		return value;
+	}
+
+	@Override
+	public int hashCode() {
+		return value;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj == this || (
+				obj != null &&
+				obj.getClass() == TestOperatorEvent.class &&
+					((TestOperatorEvent) obj).value == this.value);
+	}
+
+	@Override
+	public String toString() {
+		return "TestOperatorEvent (" + value + ')';
+	}
 }