You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/03/05 14:52:31 UTC

[flink] 06/06: [FLINK-16177][checkpointing] Integrate OperatorCoordinator with checkpoints (triggering and committing)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 963974f99d18d6a9f36fa78b792dcc2bc9e53de5
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Feb 27 15:36:39 2020 +0100

    [FLINK-16177][checkpointing] Integrate OperatorCoordinator with checkpoints (triggering and committing)
    
    Restoring state to OperatorCoordinators is not included in this commit.
    
    This closes #11274
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  34 ++++-
 .../OperatorCoordinatorCheckpointContext.java      |  76 +++++++++++
 .../checkpoint/OperatorCoordinatorCheckpoints.java | 144 +++++++++++++++++++++
 .../flink/runtime/checkpoint/OperatorState.java    |  27 +++-
 .../runtime/checkpoint/PendingCheckpoint.java      |  66 +++++++++-
 .../checkpoint/metadata/MetadataV3Serializer.java  |   7 +-
 .../runtime/executiongraph/ExecutionGraph.java     |  21 +++
 .../runtime/executiongraph/ExecutionJobVertex.java |  10 +-
 .../coordination/OperatorCoordinator.java          |  13 +-
 .../CheckpointCoordinatorMasterHooksTest.java      |   1 +
 .../CheckpointCoordinatorTestingUtils.java         |   7 +
 .../FailoverStrategyCheckpointCoordinatorTest.java |   2 +
 .../runtime/checkpoint/PendingCheckpointTest.java  | 117 ++++++++++++++++-
 .../checkpoint/metadata/CheckpointTestUtils.java   |   6 +
 .../coordination/MockOperatorCoordinator.java      |  63 +++++++++
 .../runtime/state/TestingStreamStateHandle.java    |  72 +++++++++++
 16 files changed, 643 insertions(+), 23 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 070e00c..4e23df3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -55,6 +55,8 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -110,6 +112,9 @@ public class CheckpointCoordinator {
 	/** Tasks who need to be sent a message when a checkpoint is confirmed. */
 	private final ExecutionVertex[] tasksToCommitTo;
 
+	/** The operator coordinators that need to be checkpointed. */
+	private final Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint;
+
 	/** Map from checkpoint ID to the pending checkpoint. */
 	private final Map<Long, PendingCheckpoint> pendingCheckpoints;
 
@@ -203,6 +208,7 @@ public class CheckpointCoordinator {
 		ExecutionVertex[] tasksToTrigger,
 		ExecutionVertex[] tasksToWaitFor,
 		ExecutionVertex[] tasksToCommitTo,
+		Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint,
 		CheckpointIDCounter checkpointIDCounter,
 		CompletedCheckpointStore completedCheckpointStore,
 		StateBackend checkpointStateBackend,
@@ -217,6 +223,7 @@ public class CheckpointCoordinator {
 			tasksToTrigger,
 			tasksToWaitFor,
 			tasksToCommitTo,
+			coordinatorsToCheckpoint,
 			checkpointIDCounter,
 			completedCheckpointStore,
 			checkpointStateBackend,
@@ -234,6 +241,7 @@ public class CheckpointCoordinator {
 			ExecutionVertex[] tasksToTrigger,
 			ExecutionVertex[] tasksToWaitFor,
 			ExecutionVertex[] tasksToCommitTo,
+			Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint,
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			StateBackend checkpointStateBackend,
@@ -267,6 +275,7 @@ public class CheckpointCoordinator {
 		this.tasksToTrigger = checkNotNull(tasksToTrigger);
 		this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
 		this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
+		this.coordinatorsToCheckpoint = Collections.unmodifiableCollection(coordinatorsToCheckpoint);
 		this.pendingCheckpoints = new LinkedHashMap<>();
 		this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
 		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
@@ -548,8 +557,16 @@ public class CheckpointCoordinator {
 							onCompletionPromise),
 						timer);
 
-			pendingCheckpointCompletableFuture
-				.thenCompose(this::snapshotMasterState)
+			final CompletableFuture<?> masterStatesComplete = pendingCheckpointCompletableFuture
+					.thenCompose(this::snapshotMasterState);
+
+			final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
+					.thenComposeAsync((pendingCheckpoint) ->
+							OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
+									coordinatorsToCheckpoint, pendingCheckpoint, timer),
+							timer);
+
+			CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
 				.whenCompleteAsync(
 					(ignored, throwable) -> {
 						final PendingCheckpoint checkpoint =
@@ -634,6 +651,7 @@ public class CheckpointCoordinator {
 			checkpointID,
 			timestamp,
 			ackTasks,
+			OperatorCoordinatorCheckpointContext.getIds(coordinatorsToCheckpoint),
 			masterHooks.keySet(),
 			props,
 			checkpointStorageLocation,
@@ -1080,15 +1098,23 @@ public class CheckpointCoordinator {
 			LOG.debug(builder.toString());
 		}
 
-		// send the "notify complete" call to all vertices
-		final long timestamp = completedCheckpoint.getTimestamp();
+		// send the "notify complete" call to all vertices, coordinators, etc.
+		sendAcknowledgeMessages(checkpointId, completedCheckpoint.getTimestamp());
+	}
 
+	private void sendAcknowledgeMessages(long checkpointId, long timestamp) {
+		// commit tasks
 		for (ExecutionVertex ev : tasksToCommitTo) {
 			Execution ee = ev.getCurrentExecutionAttempt();
 			if (ee != null) {
 				ee.notifyCheckpointComplete(checkpointId, timestamp);
 			}
 		}
+
+		// commit coordinators
+		for (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) {
+			coordinatorContext.coordinator().checkpointComplete(checkpointId);
+		}
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
new file mode 100644
index 0000000..16acbb2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An {@link OperatorCoordinator} and its contextual information needed to trigger and
+ * acknowledge a checkpoint.
+ */
+public final class OperatorCoordinatorCheckpointContext {
+
+	private final OperatorCoordinator coordinator;
+
+	private final OperatorID operatorId;
+
+	private final int maxParallelism;
+
+	private final int currentParallelism;
+
+	public OperatorCoordinatorCheckpointContext(
+			OperatorCoordinator coordinator,
+			OperatorID operatorId,
+			int maxParallelism,
+			int currentParallelism) {
+
+		this.coordinator = checkNotNull(coordinator);
+		this.operatorId = checkNotNull(operatorId);
+		this.maxParallelism = maxParallelism;
+		this.currentParallelism = currentParallelism;
+	}
+
+	public OperatorCoordinator coordinator() {
+		return coordinator;
+	}
+
+	public OperatorID operatorId() {
+		return operatorId;
+	}
+
+	public int maxParallelism() {
+		return maxParallelism;
+	}
+
+	public int currentParallelism() {
+		return currentParallelism;
+	}
+
+	public static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
+		return infos.stream()
+			.map(OperatorCoordinatorCheckpointContext::operatorId)
+			.collect(Collectors.toList());
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
new file mode 100644
index 0000000..cfacec8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * All the logic related to taking checkpoints of the {@link OperatorCoordinator}s.
+ *
+ * <p>NOTE: This class has a simplified error handling logic. If one of the several coordinator checkpoints
+ * fail, no cleanup is triggered for the other concurrent ones. That is okay, since they all produce just byte[]
+ * as the result. We have to change that once we allow then to create external resources that actually need
+ * to be cleaned up.
+ */
+final class OperatorCoordinatorCheckpoints {
+
+	public static CompletableFuture<CoordinatorSnapshot> triggerCoordinatorCheckpoint(
+			final OperatorCoordinatorCheckpointContext coordinatorInfo,
+			final long checkpointId) throws Exception {
+
+		final CompletableFuture<byte[]> checkpointFuture =
+				coordinatorInfo.coordinator().checkpointCoordinator(checkpointId);
+
+		return checkpointFuture.thenApply(
+				(state) -> new CoordinatorSnapshot(
+						coordinatorInfo, new ByteStreamStateHandle(coordinatorInfo.operatorId().toString(), state))
+		);
+	}
+
+	public static CompletableFuture<AllCoordinatorSnapshots> triggerAllCoordinatorCheckpoints(
+			final Collection<OperatorCoordinatorCheckpointContext> coordinators,
+			final long checkpointId) throws Exception {
+
+		final Collection<CompletableFuture<CoordinatorSnapshot>> individualSnapshots = new ArrayList<>(coordinators.size());
+
+		for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) {
+			individualSnapshots.add(triggerCoordinatorCheckpoint(coordinator, checkpointId));
+		}
+
+		return FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new);
+	}
+
+	public static CompletableFuture<Void> triggerAndAcknowledgeAllCoordinatorCheckpoints(
+			final Collection<OperatorCoordinatorCheckpointContext> coordinators,
+			final PendingCheckpoint checkpoint,
+			final Executor acknowledgeExecutor) throws Exception {
+
+		final CompletableFuture<AllCoordinatorSnapshots> snapshots =
+				triggerAllCoordinatorCheckpoints(coordinators, checkpoint.getCheckpointId());
+
+		return snapshots
+				.thenAcceptAsync(
+						(allSnapshots) -> {
+							try {
+								acknowledgeAllCoordinators(checkpoint, allSnapshots.snapshots);
+							}
+							catch (Exception e) {
+								throw new CompletionException(e);
+							}
+						},
+						acknowledgeExecutor);
+	}
+
+	public static CompletableFuture<Void> triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
+			final Collection<OperatorCoordinatorCheckpointContext> coordinators,
+			final PendingCheckpoint checkpoint,
+			final Executor acknowledgeExecutor) throws CompletionException {
+
+		try {
+			return triggerAndAcknowledgeAllCoordinatorCheckpoints(coordinators, checkpoint, acknowledgeExecutor);
+		} catch (Exception e) {
+			throw new CompletionException(e);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static void acknowledgeAllCoordinators(PendingCheckpoint checkpoint, Collection<CoordinatorSnapshot> snapshots) throws CheckpointException {
+		for (final CoordinatorSnapshot snapshot : snapshots) {
+			final PendingCheckpoint.TaskAcknowledgeResult result =
+				checkpoint.acknowledgeCoordinatorState(snapshot.coordinator, snapshot.state);
+
+			if (result != PendingCheckpoint.TaskAcknowledgeResult.SUCCESS) {
+				throw new CheckpointException("Coordinator state not acknowledged successfully: " + result,
+					CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	static final class AllCoordinatorSnapshots {
+
+		private final Collection<CoordinatorSnapshot> snapshots;
+
+		AllCoordinatorSnapshots(Collection<CoordinatorSnapshot> snapshots) {
+			this.snapshots = snapshots;
+		}
+
+		public Iterable<CoordinatorSnapshot> snapshots() {
+			return snapshots;
+		}
+	}
+
+	static final class CoordinatorSnapshot {
+
+		final OperatorCoordinatorCheckpointContext coordinator;
+		final StreamStateHandle state;
+
+		CoordinatorSnapshot(OperatorCoordinatorCheckpointContext coordinator, StreamStateHandle state) {
+			// if this is not true any more, we need more elaborate dispose/cleanup handling
+			// see comment above the class.
+			assert state instanceof ByteStreamStateHandle;
+
+			this.coordinator = coordinator;
+			this.state = state;
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index de011c1..998a3bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -21,14 +21,19 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Simple container class which contains the raw/managed operator state and key-group state handles from all sub
  * tasks of an operator and therefore represents the complete state of a logical operator.
@@ -43,6 +48,10 @@ public class OperatorState implements CompositeStateHandle {
 	/** The handles to states created by the parallel tasks: subtaskIndex -> subtaskstate. */
 	private final Map<Integer, OperatorSubtaskState> operatorSubtaskStates;
 
+	/** The state of the operator coordinator. Null, if no such state exists. */
+	@Nullable
+	private StreamStateHandle coordinatorState;
+
 	/** The parallelism of the operator when it was checkpointed. */
 	private final int parallelism;
 
@@ -87,6 +96,16 @@ public class OperatorState implements CompositeStateHandle {
 		}
 	}
 
+	public void setCoordinatorState(@Nullable StreamStateHandle coordinatorState) {
+		checkState(this.coordinatorState == null, "coordinator state already set");
+		this.coordinatorState = coordinatorState;
+	}
+
+	@Nullable
+	public StreamStateHandle getCoordinatorState() {
+		return coordinatorState;
+	}
+
 	public Map<Integer, OperatorSubtaskState> getSubtaskStates() {
 		return Collections.unmodifiableMap(operatorSubtaskStates);
 	}
@@ -112,6 +131,10 @@ public class OperatorState implements CompositeStateHandle {
 		for (OperatorSubtaskState operatorSubtaskState : operatorSubtaskStates.values()) {
 			operatorSubtaskState.discardState();
 		}
+
+		if (coordinatorState != null) {
+			coordinatorState.discardState();
+		}
 	}
 
 	@Override
@@ -123,7 +146,7 @@ public class OperatorState implements CompositeStateHandle {
 
 	@Override
 	public long getStateSize() {
-		long result = 0L;
+		long result = coordinatorState == null ? 0L : coordinatorState.getStateSize();
 
 		for (int i = 0; i < parallelism; i++) {
 			OperatorSubtaskState operatorSubtaskState = operatorSubtaskStates.get(i);
@@ -142,6 +165,7 @@ public class OperatorState implements CompositeStateHandle {
 
 			return operatorID.equals(other.operatorID)
 				&& parallelism == other.parallelism
+				&& Objects.equals(coordinatorState, other.coordinatorState)
 				&& operatorSubtaskStates.equals(other.operatorSubtaskStates);
 		} else {
 			return false;
@@ -161,6 +185,7 @@ public class OperatorState implements CompositeStateHandle {
 			"operatorID: " + operatorID +
 			", parallelism: " + parallelism +
 			", maxParallelism: " + maxParallelism +
+			", coordinatorState: " + (coordinatorState == null ? "(none)" : coordinatorState.getStateSize() + " bytes") +
 			", sub task states: " + operatorSubtaskStates.size() +
 			", total size (bytes): " + getStateSize() +
 			')';
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index ae9027d..27a8513 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -38,6 +39,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -88,6 +90,8 @@ public class PendingCheckpoint {
 
 	private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
 
+	private final Set<OperatorID> notYetAcknowledgedOperatorCoordinators;
+
 	private final List<MasterState> masterStates;
 
 	private final Set<String> notYetAcknowledgedMasterStates;
@@ -126,6 +130,7 @@ public class PendingCheckpoint {
 			long checkpointId,
 			long checkpointTimestamp,
 			Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
+			Collection<OperatorID> operatorCoordinatorsToConfirm,
 			Collection<String> masterStateIdentifiers,
 			CheckpointProperties props,
 			CheckpointStorageLocation targetLocation,
@@ -145,7 +150,10 @@ public class PendingCheckpoint {
 
 		this.operatorStates = new HashMap<>();
 		this.masterStates = new ArrayList<>(masterStateIdentifiers.size());
-		this.notYetAcknowledgedMasterStates = new HashSet<>(masterStateIdentifiers);
+		this.notYetAcknowledgedMasterStates = masterStateIdentifiers.isEmpty()
+				? Collections.emptySet() : new HashSet<>(masterStateIdentifiers);
+		this.notYetAcknowledgedOperatorCoordinators = operatorCoordinatorsToConfirm.isEmpty()
+				? Collections.emptySet() : new HashSet<>(operatorCoordinatorsToConfirm);
 		this.acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
 		this.onCompletionPromise = checkNotNull(onCompletionPromise);
 	}
@@ -176,6 +184,10 @@ public class PendingCheckpoint {
 		return notYetAcknowledgedTasks.size();
 	}
 
+	public int getNumberOfNonAcknowledgedOperatorCoordinators() {
+		return notYetAcknowledgedOperatorCoordinators.size();
+	}
+
 	public int getNumberOfAcknowledgedTasks() {
 		return numAcknowledgedTasks;
 	}
@@ -188,11 +200,21 @@ public class PendingCheckpoint {
 		return masterStates;
 	}
 
-	public boolean areMasterStatesFullyAcknowledged() {
+	public boolean isFullyAcknowledged() {
+		return areTasksFullyAcknowledged() &&
+			areCoordinatorsFullyAcknowledged() &&
+			areMasterStatesFullyAcknowledged();
+	}
+
+	boolean areMasterStatesFullyAcknowledged() {
 		return notYetAcknowledgedMasterStates.isEmpty() && !discarded;
 	}
 
-	public boolean areTasksFullyAcknowledged() {
+	boolean areCoordinatorsFullyAcknowledged() {
+		return notYetAcknowledgedOperatorCoordinators.isEmpty() && !discarded;
+	}
+
+	boolean areTasksFullyAcknowledged() {
 		return notYetAcknowledgedTasks.isEmpty() && !discarded;
 	}
 
@@ -270,10 +292,8 @@ public class PendingCheckpoint {
 	public CompletedCheckpoint finalizeCheckpoint() throws IOException {
 
 		synchronized (lock) {
-			checkState(areMasterStatesFullyAcknowledged(),
-				"Pending checkpoint has not been fully acknowledged by master states yet.");
-			checkState(areTasksFullyAcknowledged(),
-				"Pending checkpoint has not been fully acknowledged by tasks yet.");
+			checkState(!isDiscarded(), "checkpoint is discarded");
+			checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet");
 
 			// make sure we fulfill the promise with an exception if something fails
 			try {
@@ -410,6 +430,38 @@ public class PendingCheckpoint {
 		}
 	}
 
+	public TaskAcknowledgeResult acknowledgeCoordinatorState(
+			OperatorCoordinatorCheckpointContext coordinatorInfo,
+			@Nullable StreamStateHandle stateHandle) {
+
+		synchronized (lock) {
+			if (discarded) {
+				return TaskAcknowledgeResult.DISCARDED;
+			}
+
+			final OperatorID operatorId = coordinatorInfo.operatorId();
+			OperatorState operatorState = operatorStates.get(operatorId);
+
+			// sanity check for better error reporting
+			if (!notYetAcknowledgedOperatorCoordinators.remove(operatorId)) {
+				return operatorState != null && operatorState.getCoordinatorState() != null
+						? TaskAcknowledgeResult.DUPLICATE
+						: TaskAcknowledgeResult.UNKNOWN;
+			}
+
+			if (stateHandle != null) {
+				if (operatorState == null) {
+					operatorState = new OperatorState(
+						operatorId, coordinatorInfo.currentParallelism(), coordinatorInfo.maxParallelism());
+					operatorStates.put(operatorId, operatorState);
+				}
+				operatorState.setCoordinatorState(stateHandle);
+			}
+
+			return TaskAcknowledgeResult.SUCCESS;
+		}
+	}
+
 	/**
 	 * Acknowledges a master state (state generated on the checkpoint coordinator) to
 	 * the pending checkpoint.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
index e304ef4..7db1255 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
@@ -81,6 +81,9 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 		dos.writeInt(operatorState.getParallelism());
 		dos.writeInt(operatorState.getMaxParallelism());
 
+		// Coordinator state
+		serializeStreamStateHandle(operatorState.getCoordinatorState(), dos);
+
 		// Sub task states
 		final Map<Integer, OperatorSubtaskState> subtaskStateMap = operatorState.getSubtaskStates();
 		dos.writeInt(subtaskStateMap.size());
@@ -96,9 +99,11 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 		final int parallelism = dis.readInt();
 		final int maxParallelism = dis.readInt();
 
-		// Add task state
 		final OperatorState operatorState = new OperatorState(jobVertexId, parallelism, maxParallelism);
 
+		// Coordinator state
+		operatorState.setCoordinatorState(deserializeStreamStateHandle(dis));
+
 		// Sub task states
 		final int numSubTaskStates = dis.readInt();
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f8d79fa..09a6a3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
@@ -60,10 +61,12 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.scheduler.InternalFailuresListener;
 import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
@@ -457,6 +460,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
 		ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
 
+		final Collection<OperatorCoordinatorCheckpointContext> operatorCoordinators = buildOpCoordinatorCheckpointContexts();
+
 		checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
 
 		CheckpointFailureManager failureManager = new CheckpointFailureManager(
@@ -487,6 +492,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			tasksToTrigger,
 			tasksToWaitFor,
 			tasksToCommitTo,
+			operatorCoordinators,
 			checkpointIDCounter,
 			checkpointStore,
 			checkpointStateBackend,
@@ -566,6 +572,21 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		}
 	}
 
+	private Collection<OperatorCoordinatorCheckpointContext> buildOpCoordinatorCheckpointContexts() {
+		final ArrayList<OperatorCoordinatorCheckpointContext> contexts = new ArrayList<>();
+		for (final ExecutionJobVertex vertex : verticesInCreationOrder) {
+			for (final Map.Entry<OperatorID, OperatorCoordinator> coordinator : vertex.getOperatorCoordinatorMap().entrySet()) {
+				contexts.add(new OperatorCoordinatorCheckpointContext(
+						coordinator.getValue(),
+						coordinator.getKey(),
+						vertex.getMaxParallelism(),
+						vertex.getParallelism()));
+			}
+		}
+		contexts.trimToSize();
+		return contexts;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Properties and Status of the Execution Graph
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 4b60652..44d33fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -249,10 +249,12 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		}
 
 		try {
-			this.operatorCoordinators = OperatorCoordinatorUtil.instantiateCoordinators(
+			final Map<OperatorID, OperatorCoordinator> coordinators = OperatorCoordinatorUtil.instantiateCoordinators(
 					jobVertex.getOperatorCoordinators(),
 					graph.getUserClassLoader(),
 					(opId) -> new ExecutionJobVertexCoordinatorContext(opId, this));
+
+			this.operatorCoordinators = Collections.unmodifiableMap(coordinators);
 		}
 		catch (IOException | ClassNotFoundException e) {
 			throw new JobException("Cannot instantiate the coordinator for operator " + getName(), e);
@@ -402,8 +404,12 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		return operatorCoordinators.get(operatorId);
 	}
 
+	public Map<OperatorID, OperatorCoordinator> getOperatorCoordinatorMap() {
+		return operatorCoordinators;
+	}
+
 	public Collection<OperatorCoordinator> getOperatorCoordinators() {
-		return Collections.unmodifiableCollection(operatorCoordinators.values());
+		return operatorCoordinators.values();
 	}
 
 	public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index b5951c2..eaa24ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -67,7 +67,18 @@ public interface OperatorCoordinator extends AutoCloseable {
 
 	CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
 
-	void checkpointComplete(long checkpointId) throws Exception;
+	/**
+	 * Notifies the coordinator that the checkpoint with the given checkpointId completes and
+	 * was committed.
+	 *
+	 * <p><b>Important:</b> This method is not supposed to throw an exception, because by the
+	 * time we notify that the checkpoint is complete, the checkpoint is committed and cannot be
+	 * aborted any more. If the coordinator gets into an inconsistent state internally, it should
+	 * fail the job ({@link Context#failJob(Throwable)}) instead. Any exception propagating from
+	 * this method may be treated as a fatal error for the JobManager, crashing the JobManager,
+	 * and leading to an expensive "master failover" procedure.
+	 */
+	void checkpointComplete(long checkpointId);
 
 	void resetToCheckpoint(byte[] checkpointData) throws Exception;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index e5ea59a..cff2571 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -460,6 +460,7 @@ public class CheckpointCoordinatorMasterHooksTest {
 				new ExecutionVertex[0],
 				ackVertices,
 				new ExecutionVertex[0],
+				Collections.emptyList(),
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10),
 				new MemoryStateBackend(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 14ff439..cf36f20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -659,6 +659,8 @@ public class CheckpointCoordinatorTestingUtils {
 
 		private ExecutionVertex[] tasksToCommitTo;
 
+		private Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint = Collections.emptyList();
+
 		private CheckpointIDCounter checkpointIDCounter =
 			new StandaloneCheckpointIDCounter();
 
@@ -718,6 +720,10 @@ public class CheckpointCoordinatorTestingUtils {
 			return this;
 		}
 
+		public void setCoordinatorsToCheckpoint(Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint) {
+			this.coordinatorsToCheckpoint = coordinatorsToCheckpoint;
+		}
+
 		public CheckpointCoordinatorBuilder setCheckpointIDCounter(
 			CheckpointIDCounter checkpointIDCounter) {
 			this.checkpointIDCounter = checkpointIDCounter;
@@ -764,6 +770,7 @@ public class CheckpointCoordinatorTestingUtils {
 				tasksToTrigger,
 				tasksToWaitFor,
 				tasksToCommitTo,
+				coordinatorsToCheckpoint,
 				checkpointIDCounter,
 				completedCheckpointStore,
 				checkpointStateBackend,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
index f281ff4..6d7b6cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
@@ -36,6 +36,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.Collections;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.junit.Assert.assertEquals;
@@ -79,6 +80,7 @@ public class FailoverStrategyCheckpointCoordinatorTest extends TestLogger {
 			new ExecutionVertex[] { executionVertex },
 			new ExecutionVertex[] { executionVertex },
 			new ExecutionVertex[] { executionVertex },
+			Collections.emptyList(),
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			new MemoryStateBackend(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index c78c077..f172e51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.StringSerializer;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint.TaskAcknowledgeResult;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -30,10 +31,14 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinator;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestingStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
 
+import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -46,6 +51,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -59,11 +65,12 @@ import java.util.concurrent.ScheduledFuture;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -189,7 +196,6 @@ public class PendingCheckpointTest {
 	 * Tests that abort discards state.
 	 */
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testAbortDiscardsState() throws Exception {
 		CheckpointProperties props = new CheckpointProperties(false, CheckpointType.SAVEPOINT, false, false, false, false, false);
 		QueueExecutor executor = new QueueExecutor();
@@ -428,21 +434,109 @@ public class PendingCheckpointTest {
 		assertEquals("state", deserializedState);
 	}
 
+	@Test
+	public void testInitiallyUnacknowledgedCoordinatorStates() throws Exception {
+		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(
+				createOperatorCoordinator(), createOperatorCoordinator());
+
+		assertEquals(2, checkpoint.getNumberOfNonAcknowledgedOperatorCoordinators());
+		assertFalse(checkpoint.isFullyAcknowledged());
+	}
+
+	@Test
+	public void testAcknowledgedCoordinatorStates() throws Exception {
+		final OperatorCoordinatorCheckpointContext coord1 = createOperatorCoordinator();
+		final OperatorCoordinatorCheckpointContext coord2 = createOperatorCoordinator();
+		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coord1, coord2);
+
+		final TaskAcknowledgeResult ack1 = checkpoint.acknowledgeCoordinatorState(coord1, new TestingStreamStateHandle());
+		final TaskAcknowledgeResult ack2 = checkpoint.acknowledgeCoordinatorState(coord2, null);
+
+		assertEquals(TaskAcknowledgeResult.SUCCESS, ack1);
+		assertEquals(TaskAcknowledgeResult.SUCCESS, ack2);
+		assertEquals(0, checkpoint.getNumberOfNonAcknowledgedOperatorCoordinators());
+		assertTrue(checkpoint.isFullyAcknowledged());
+		assertThat(checkpoint.getOperatorStates().keySet(), Matchers.contains(coord1.operatorId()));
+	}
+
+	@Test
+	public void testDuplicateAcknowledgeCoordinator() throws Exception {
+		final OperatorCoordinatorCheckpointContext coordinator = createOperatorCoordinator();
+		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coordinator);
+
+		checkpoint.acknowledgeCoordinatorState(coordinator, new TestingStreamStateHandle());
+		final TaskAcknowledgeResult secondAck = checkpoint.acknowledgeCoordinatorState(coordinator, null);
+
+		assertEquals(TaskAcknowledgeResult.DUPLICATE, secondAck);
+	}
+
+	@Test
+	public void testAcknowledgeUnknownCoordinator() throws Exception {
+		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(createOperatorCoordinator());
+
+		final TaskAcknowledgeResult ack = checkpoint.acknowledgeCoordinatorState(createOperatorCoordinator(), null);
+
+		assertEquals(TaskAcknowledgeResult.UNKNOWN, ack);
+	}
+
+	@Test
+	public void testDisposeDisposesCoordinatorStates() throws Exception {
+		final TestingStreamStateHandle handle1 = new TestingStreamStateHandle();
+		final TestingStreamStateHandle handle2 = new TestingStreamStateHandle();
+		final PendingCheckpoint checkpoint = createPendingCheckpointWithAcknowledgedCoordinators(handle1, handle2);
+
+		checkpoint.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED);
+
+		assertTrue(handle1.isDisposed());
+		assertTrue(handle2.isDisposed());
+	}
+
 	// ------------------------------------------------------------------------
 
 	private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props) throws IOException {
-		return createPendingCheckpoint(props, Collections.emptyList(), Executors.directExecutor());
+		return createPendingCheckpoint(props, Collections.emptyList(), Collections.emptyList(), Executors.directExecutor());
 	}
 
 	private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Executor executor) throws IOException {
-		return createPendingCheckpoint(props, Collections.emptyList(), executor);
+		return createPendingCheckpoint(props, Collections.emptyList(), Collections.emptyList(), executor);
 	}
 
 	private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Collection<String> masterStateIdentifiers) throws IOException {
-		return createPendingCheckpoint(props, masterStateIdentifiers, Executors.directExecutor());
+		return createPendingCheckpoint(props, Collections.emptyList(), masterStateIdentifiers, Executors.directExecutor());
 	}
 
-	private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Collection<String> masterStateIdentifiers, Executor executor) throws IOException {
+	private PendingCheckpoint createPendingCheckpointWithCoordinators(
+			OperatorCoordinatorCheckpointContext... coordinators) throws IOException {
+
+		final PendingCheckpoint checkpoint = createPendingCheckpoint(
+				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+				OperatorCoordinatorCheckpointContext.getIds(Arrays.asList(coordinators)),
+				Collections.emptyList(),
+				Executors.directExecutor());
+
+		checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+		return checkpoint;
+	}
+
+	private PendingCheckpoint createPendingCheckpointWithAcknowledgedCoordinators(StreamStateHandle... handles) throws IOException {
+		OperatorCoordinatorCheckpointContext[] coords = new OperatorCoordinatorCheckpointContext[handles.length];
+		for (int i = 0; i < handles.length; i++) {
+			coords[i] = createOperatorCoordinator();
+		}
+
+		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coords);
+		for (int i = 0; i < handles.length; i++) {
+			checkpoint.acknowledgeCoordinatorState(coords[i], handles[i]);
+		}
+
+		return checkpoint;
+	}
+
+	private PendingCheckpoint createPendingCheckpoint(
+			CheckpointProperties props,
+			Collection<OperatorID> operatorCoordinators,
+			Collection<String> masterStateIdentifiers,
+			Executor executor) throws IOException {
 
 		final Path checkpointDir = new Path(tmpFolder.newFolder().toURI());
 		final FsCheckpointStorageLocation location = new FsCheckpointStorageLocation(
@@ -459,6 +553,7 @@ public class PendingCheckpointTest {
 			0,
 			1,
 			ackTasks,
+			operatorCoordinators,
 			masterStateIdentifiers,
 			props,
 			location,
@@ -466,6 +561,14 @@ public class PendingCheckpointTest {
 			new CompletableFuture<>());
 	}
 
+	private static OperatorCoordinatorCheckpointContext createOperatorCoordinator() {
+		return new OperatorCoordinatorCheckpointContext(
+				new MockOperatorCoordinator(),
+				new OperatorID(),
+				256,
+				50);
+	}
+
 	@SuppressWarnings("unchecked")
 	static void setTaskState(PendingCheckpoint pending, OperatorState state) throws NoSuchFieldException, IllegalAccessException {
 		Field field = PendingCheckpoint.class.getDeclaredField("operatorStates");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
index f411a39..ef6ea85 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
@@ -66,6 +66,12 @@ public class CheckpointTestUtils {
 
 			OperatorState taskState = new OperatorState(new OperatorID(), numSubtasksPerTask, 128);
 
+			final boolean hasCoordinatorState = random.nextBoolean();
+			if (hasCoordinatorState) {
+				final StreamStateHandle stateHandle = createDummyStreamStateHandle(random);
+				taskState.setCoordinatorState(stateHandle);
+			}
+
 			boolean hasOperatorStateBackend = random.nextBoolean();
 			boolean hasOperatorStateStream = random.nextBoolean();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
new file mode 100644
index 0000000..c4122fc
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An empty interface implementation of the {@link OperatorCoordinator}.
+ * If you need a testing stub, use the {@link TestingOperatorCoordinator} instead.
+ */
+public final class MockOperatorCoordinator implements OperatorCoordinator {
+
+	@Override
+	public void start() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void close() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void handleEventFromOperator(int subtask, OperatorEvent event) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void subtaskFailed(int subtask) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void checkpointComplete(long checkpointId) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void resetToCheckpoint(byte[] checkpointData) {
+		throw new UnsupportedOperationException();
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingStreamStateHandle.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingStreamStateHandle.java
new file mode 100644
index 0000000..7a968d2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingStreamStateHandle.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import javax.annotation.Nullable;
+
+/**
+ * A simple test mock for a {@link StreamStateHandle}.
+ */
+public class TestingStreamStateHandle implements StreamStateHandle {
+	private static final long serialVersionUID = 1L;
+
+	@Nullable
+	private final FSDataInputStream inputStream;
+
+	private final long size;
+
+	private boolean disposed;
+
+	public TestingStreamStateHandle() {
+		this(null, 0L);
+	}
+
+	public TestingStreamStateHandle(@Nullable FSDataInputStream inputStream, long size) {
+		this.inputStream = inputStream;
+		this.size = size;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public FSDataInputStream openInputStream() {
+		if (inputStream == null) {
+			throw new UnsupportedOperationException("no input stream provided");
+		}
+		return inputStream;
+	}
+
+	@Override
+	public void discardState() {
+		disposed = true;
+	}
+
+	@Override
+	public long getStateSize() {
+		return size;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public boolean isDisposed() {
+		return disposed;
+	}
+}