You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/03/05 14:52:31 UTC
[flink] 06/06: [FLINK-16177][checkpointing] Integrate
OperatorCoordinator with checkpoints (triggering and committing)
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 963974f99d18d6a9f36fa78b792dcc2bc9e53de5
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Feb 27 15:36:39 2020 +0100
[FLINK-16177][checkpointing] Integrate OperatorCoordinator with checkpoints (triggering and committing)
Restoring state to OperatorCoordinators is not included in this commit.
This closes #11274
---
.../runtime/checkpoint/CheckpointCoordinator.java | 34 ++++-
.../OperatorCoordinatorCheckpointContext.java | 76 +++++++++++
.../checkpoint/OperatorCoordinatorCheckpoints.java | 144 +++++++++++++++++++++
.../flink/runtime/checkpoint/OperatorState.java | 27 +++-
.../runtime/checkpoint/PendingCheckpoint.java | 66 +++++++++-
.../checkpoint/metadata/MetadataV3Serializer.java | 7 +-
.../runtime/executiongraph/ExecutionGraph.java | 21 +++
.../runtime/executiongraph/ExecutionJobVertex.java | 10 +-
.../coordination/OperatorCoordinator.java | 13 +-
.../CheckpointCoordinatorMasterHooksTest.java | 1 +
.../CheckpointCoordinatorTestingUtils.java | 7 +
.../FailoverStrategyCheckpointCoordinatorTest.java | 2 +
.../runtime/checkpoint/PendingCheckpointTest.java | 117 ++++++++++++++++-
.../checkpoint/metadata/CheckpointTestUtils.java | 6 +
.../coordination/MockOperatorCoordinator.java | 63 +++++++++
.../runtime/state/TestingStreamStateHandle.java | 72 +++++++++++
16 files changed, 643 insertions(+), 23 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 070e00c..4e23df3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -55,6 +55,8 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -110,6 +112,9 @@ public class CheckpointCoordinator {
/** Tasks who need to be sent a message when a checkpoint is confirmed. */
private final ExecutionVertex[] tasksToCommitTo;
+ /** The operator coordinators that need to be checkpointed. */
+ private final Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint;
+
/** Map from checkpoint ID to the pending checkpoint. */
private final Map<Long, PendingCheckpoint> pendingCheckpoints;
@@ -203,6 +208,7 @@ public class CheckpointCoordinator {
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
+ Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
StateBackend checkpointStateBackend,
@@ -217,6 +223,7 @@ public class CheckpointCoordinator {
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
+ coordinatorsToCheckpoint,
checkpointIDCounter,
completedCheckpointStore,
checkpointStateBackend,
@@ -234,6 +241,7 @@ public class CheckpointCoordinator {
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
+ Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
StateBackend checkpointStateBackend,
@@ -267,6 +275,7 @@ public class CheckpointCoordinator {
this.tasksToTrigger = checkNotNull(tasksToTrigger);
this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
+ this.coordinatorsToCheckpoint = Collections.unmodifiableCollection(coordinatorsToCheckpoint);
this.pendingCheckpoints = new LinkedHashMap<>();
this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
@@ -548,8 +557,16 @@ public class CheckpointCoordinator {
onCompletionPromise),
timer);
- pendingCheckpointCompletableFuture
- .thenCompose(this::snapshotMasterState)
+ final CompletableFuture<?> masterStatesComplete = pendingCheckpointCompletableFuture
+ .thenCompose(this::snapshotMasterState);
+
+ final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
+ .thenComposeAsync((pendingCheckpoint) ->
+ OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
+ coordinatorsToCheckpoint, pendingCheckpoint, timer),
+ timer);
+
+ CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
.whenCompleteAsync(
(ignored, throwable) -> {
final PendingCheckpoint checkpoint =
@@ -634,6 +651,7 @@ public class CheckpointCoordinator {
checkpointID,
timestamp,
ackTasks,
+ OperatorCoordinatorCheckpointContext.getIds(coordinatorsToCheckpoint),
masterHooks.keySet(),
props,
checkpointStorageLocation,
@@ -1080,15 +1098,23 @@ public class CheckpointCoordinator {
LOG.debug(builder.toString());
}
- // send the "notify complete" call to all vertices
- final long timestamp = completedCheckpoint.getTimestamp();
+ // send the "notify complete" call to all vertices, coordinators, etc.
+ sendAcknowledgeMessages(checkpointId, completedCheckpoint.getTimestamp());
+ }
+ private void sendAcknowledgeMessages(long checkpointId, long timestamp) {
+ // commit tasks
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ee.notifyCheckpointComplete(checkpointId, timestamp);
}
}
+
+ // commit coordinators
+ for (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) {
+ coordinatorContext.coordinator().checkpointComplete(checkpointId);
+ }
}
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
new file mode 100644
index 0000000..16acbb2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An {@link OperatorCoordinator} and its contextual information needed to trigger and
+ * acknowledge a checkpoint.
+ */
+public final class OperatorCoordinatorCheckpointContext {
+
+ private final OperatorCoordinator coordinator;
+
+ private final OperatorID operatorId;
+
+ private final int maxParallelism;
+
+ private final int currentParallelism;
+
+ public OperatorCoordinatorCheckpointContext(
+ OperatorCoordinator coordinator,
+ OperatorID operatorId,
+ int maxParallelism,
+ int currentParallelism) {
+
+ this.coordinator = checkNotNull(coordinator);
+ this.operatorId = checkNotNull(operatorId);
+ this.maxParallelism = maxParallelism;
+ this.currentParallelism = currentParallelism;
+ }
+
+ public OperatorCoordinator coordinator() {
+ return coordinator;
+ }
+
+ public OperatorID operatorId() {
+ return operatorId;
+ }
+
+ public int maxParallelism() {
+ return maxParallelism;
+ }
+
+ public int currentParallelism() {
+ return currentParallelism;
+ }
+
+ public static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
+ return infos.stream()
+ .map(OperatorCoordinatorCheckpointContext::operatorId)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
new file mode 100644
index 0000000..cfacec8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+/**
+ * All the logic related to taking checkpoints of the {@link OperatorCoordinator}s.
+ *
+ * <p>NOTE: This class has a simplified error handling logic. If one of the several coordinator checkpoints
+ * fail, no cleanup is triggered for the other concurrent ones. That is okay, since they all produce just byte[]
+ * as the result. We have to change that once we allow then to create external resources that actually need
+ * to be cleaned up.
+ */
+final class OperatorCoordinatorCheckpoints {
+
+ public static CompletableFuture<CoordinatorSnapshot> triggerCoordinatorCheckpoint(
+ final OperatorCoordinatorCheckpointContext coordinatorInfo,
+ final long checkpointId) throws Exception {
+
+ final CompletableFuture<byte[]> checkpointFuture =
+ coordinatorInfo.coordinator().checkpointCoordinator(checkpointId);
+
+ return checkpointFuture.thenApply(
+ (state) -> new CoordinatorSnapshot(
+ coordinatorInfo, new ByteStreamStateHandle(coordinatorInfo.operatorId().toString(), state))
+ );
+ }
+
+ public static CompletableFuture<AllCoordinatorSnapshots> triggerAllCoordinatorCheckpoints(
+ final Collection<OperatorCoordinatorCheckpointContext> coordinators,
+ final long checkpointId) throws Exception {
+
+ final Collection<CompletableFuture<CoordinatorSnapshot>> individualSnapshots = new ArrayList<>(coordinators.size());
+
+ for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) {
+ individualSnapshots.add(triggerCoordinatorCheckpoint(coordinator, checkpointId));
+ }
+
+ return FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new);
+ }
+
+ public static CompletableFuture<Void> triggerAndAcknowledgeAllCoordinatorCheckpoints(
+ final Collection<OperatorCoordinatorCheckpointContext> coordinators,
+ final PendingCheckpoint checkpoint,
+ final Executor acknowledgeExecutor) throws Exception {
+
+ final CompletableFuture<AllCoordinatorSnapshots> snapshots =
+ triggerAllCoordinatorCheckpoints(coordinators, checkpoint.getCheckpointId());
+
+ return snapshots
+ .thenAcceptAsync(
+ (allSnapshots) -> {
+ try {
+ acknowledgeAllCoordinators(checkpoint, allSnapshots.snapshots);
+ }
+ catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ },
+ acknowledgeExecutor);
+ }
+
+ public static CompletableFuture<Void> triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
+ final Collection<OperatorCoordinatorCheckpointContext> coordinators,
+ final PendingCheckpoint checkpoint,
+ final Executor acknowledgeExecutor) throws CompletionException {
+
+ try {
+ return triggerAndAcknowledgeAllCoordinatorCheckpoints(coordinators, checkpoint, acknowledgeExecutor);
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static void acknowledgeAllCoordinators(PendingCheckpoint checkpoint, Collection<CoordinatorSnapshot> snapshots) throws CheckpointException {
+ for (final CoordinatorSnapshot snapshot : snapshots) {
+ final PendingCheckpoint.TaskAcknowledgeResult result =
+ checkpoint.acknowledgeCoordinatorState(snapshot.coordinator, snapshot.state);
+
+ if (result != PendingCheckpoint.TaskAcknowledgeResult.SUCCESS) {
+ throw new CheckpointException("Coordinator state not acknowledged successfully: " + result,
+ CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ static final class AllCoordinatorSnapshots {
+
+ private final Collection<CoordinatorSnapshot> snapshots;
+
+ AllCoordinatorSnapshots(Collection<CoordinatorSnapshot> snapshots) {
+ this.snapshots = snapshots;
+ }
+
+ public Iterable<CoordinatorSnapshot> snapshots() {
+ return snapshots;
+ }
+ }
+
+ static final class CoordinatorSnapshot {
+
+ final OperatorCoordinatorCheckpointContext coordinator;
+ final StreamStateHandle state;
+
+ CoordinatorSnapshot(OperatorCoordinatorCheckpointContext coordinator, StreamStateHandle state) {
+ // if this is not true any more, we need more elaborate dispose/cleanup handling
+ // see comment above the class.
+ assert state instanceof ByteStreamStateHandle;
+
+ this.coordinator = coordinator;
+ this.state = state;
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index de011c1..998a3bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -21,14 +21,19 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CompositeStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
* Simple container class which contains the raw/managed operator state and key-group state handles from all sub
* tasks of an operator and therefore represents the complete state of a logical operator.
@@ -43,6 +48,10 @@ public class OperatorState implements CompositeStateHandle {
/** The handles to states created by the parallel tasks: subtaskIndex -> subtaskstate. */
private final Map<Integer, OperatorSubtaskState> operatorSubtaskStates;
+ /** The state of the operator coordinator. Null, if no such state exists. */
+ @Nullable
+ private StreamStateHandle coordinatorState;
+
/** The parallelism of the operator when it was checkpointed. */
private final int parallelism;
@@ -87,6 +96,16 @@ public class OperatorState implements CompositeStateHandle {
}
}
+ public void setCoordinatorState(@Nullable StreamStateHandle coordinatorState) {
+ checkState(this.coordinatorState == null, "coordinator state already set");
+ this.coordinatorState = coordinatorState;
+ }
+
+ @Nullable
+ public StreamStateHandle getCoordinatorState() {
+ return coordinatorState;
+ }
+
public Map<Integer, OperatorSubtaskState> getSubtaskStates() {
return Collections.unmodifiableMap(operatorSubtaskStates);
}
@@ -112,6 +131,10 @@ public class OperatorState implements CompositeStateHandle {
for (OperatorSubtaskState operatorSubtaskState : operatorSubtaskStates.values()) {
operatorSubtaskState.discardState();
}
+
+ if (coordinatorState != null) {
+ coordinatorState.discardState();
+ }
}
@Override
@@ -123,7 +146,7 @@ public class OperatorState implements CompositeStateHandle {
@Override
public long getStateSize() {
- long result = 0L;
+ long result = coordinatorState == null ? 0L : coordinatorState.getStateSize();
for (int i = 0; i < parallelism; i++) {
OperatorSubtaskState operatorSubtaskState = operatorSubtaskStates.get(i);
@@ -142,6 +165,7 @@ public class OperatorState implements CompositeStateHandle {
return operatorID.equals(other.operatorID)
&& parallelism == other.parallelism
+ && Objects.equals(coordinatorState, other.coordinatorState)
&& operatorSubtaskStates.equals(other.operatorSubtaskStates);
} else {
return false;
@@ -161,6 +185,7 @@ public class OperatorState implements CompositeStateHandle {
"operatorID: " + operatorID +
", parallelism: " + parallelism +
", maxParallelism: " + maxParallelism +
+ ", coordinatorState: " + (coordinatorState == null ? "(none)" : coordinatorState.getStateSize() + " bytes") +
", sub task states: " + operatorSubtaskStates.size() +
", total size (bytes): " + getStateSize() +
')';
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index ae9027d..27a8513 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
@@ -38,6 +39,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -88,6 +90,8 @@ public class PendingCheckpoint {
private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
+ private final Set<OperatorID> notYetAcknowledgedOperatorCoordinators;
+
private final List<MasterState> masterStates;
private final Set<String> notYetAcknowledgedMasterStates;
@@ -126,6 +130,7 @@ public class PendingCheckpoint {
long checkpointId,
long checkpointTimestamp,
Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
+ Collection<OperatorID> operatorCoordinatorsToConfirm,
Collection<String> masterStateIdentifiers,
CheckpointProperties props,
CheckpointStorageLocation targetLocation,
@@ -145,7 +150,10 @@ public class PendingCheckpoint {
this.operatorStates = new HashMap<>();
this.masterStates = new ArrayList<>(masterStateIdentifiers.size());
- this.notYetAcknowledgedMasterStates = new HashSet<>(masterStateIdentifiers);
+ this.notYetAcknowledgedMasterStates = masterStateIdentifiers.isEmpty()
+ ? Collections.emptySet() : new HashSet<>(masterStateIdentifiers);
+ this.notYetAcknowledgedOperatorCoordinators = operatorCoordinatorsToConfirm.isEmpty()
+ ? Collections.emptySet() : new HashSet<>(operatorCoordinatorsToConfirm);
this.acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
this.onCompletionPromise = checkNotNull(onCompletionPromise);
}
@@ -176,6 +184,10 @@ public class PendingCheckpoint {
return notYetAcknowledgedTasks.size();
}
+ public int getNumberOfNonAcknowledgedOperatorCoordinators() {
+ return notYetAcknowledgedOperatorCoordinators.size();
+ }
+
public int getNumberOfAcknowledgedTasks() {
return numAcknowledgedTasks;
}
@@ -188,11 +200,21 @@ public class PendingCheckpoint {
return masterStates;
}
- public boolean areMasterStatesFullyAcknowledged() {
+ public boolean isFullyAcknowledged() {
+ return areTasksFullyAcknowledged() &&
+ areCoordinatorsFullyAcknowledged() &&
+ areMasterStatesFullyAcknowledged();
+ }
+
+ boolean areMasterStatesFullyAcknowledged() {
return notYetAcknowledgedMasterStates.isEmpty() && !discarded;
}
- public boolean areTasksFullyAcknowledged() {
+ boolean areCoordinatorsFullyAcknowledged() {
+ return notYetAcknowledgedOperatorCoordinators.isEmpty() && !discarded;
+ }
+
+ boolean areTasksFullyAcknowledged() {
return notYetAcknowledgedTasks.isEmpty() && !discarded;
}
@@ -270,10 +292,8 @@ public class PendingCheckpoint {
public CompletedCheckpoint finalizeCheckpoint() throws IOException {
synchronized (lock) {
- checkState(areMasterStatesFullyAcknowledged(),
- "Pending checkpoint has not been fully acknowledged by master states yet.");
- checkState(areTasksFullyAcknowledged(),
- "Pending checkpoint has not been fully acknowledged by tasks yet.");
+ checkState(!isDiscarded(), "checkpoint is discarded");
+ checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet");
// make sure we fulfill the promise with an exception if something fails
try {
@@ -410,6 +430,38 @@ public class PendingCheckpoint {
}
}
+ public TaskAcknowledgeResult acknowledgeCoordinatorState(
+ OperatorCoordinatorCheckpointContext coordinatorInfo,
+ @Nullable StreamStateHandle stateHandle) {
+
+ synchronized (lock) {
+ if (discarded) {
+ return TaskAcknowledgeResult.DISCARDED;
+ }
+
+ final OperatorID operatorId = coordinatorInfo.operatorId();
+ OperatorState operatorState = operatorStates.get(operatorId);
+
+ // sanity check for better error reporting
+ if (!notYetAcknowledgedOperatorCoordinators.remove(operatorId)) {
+ return operatorState != null && operatorState.getCoordinatorState() != null
+ ? TaskAcknowledgeResult.DUPLICATE
+ : TaskAcknowledgeResult.UNKNOWN;
+ }
+
+ if (stateHandle != null) {
+ if (operatorState == null) {
+ operatorState = new OperatorState(
+ operatorId, coordinatorInfo.currentParallelism(), coordinatorInfo.maxParallelism());
+ operatorStates.put(operatorId, operatorState);
+ }
+ operatorState.setCoordinatorState(stateHandle);
+ }
+
+ return TaskAcknowledgeResult.SUCCESS;
+ }
+ }
+
/**
* Acknowledges a master state (state generated on the checkpoint coordinator) to
* the pending checkpoint.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
index e304ef4..7db1255 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
@@ -81,6 +81,9 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
dos.writeInt(operatorState.getParallelism());
dos.writeInt(operatorState.getMaxParallelism());
+ // Coordinator state
+ serializeStreamStateHandle(operatorState.getCoordinatorState(), dos);
+
// Sub task states
final Map<Integer, OperatorSubtaskState> subtaskStateMap = operatorState.getSubtaskStates();
dos.writeInt(subtaskStateMap.size());
@@ -96,9 +99,11 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
final int parallelism = dis.readInt();
final int maxParallelism = dis.readInt();
- // Add task state
final OperatorState operatorState = new OperatorState(jobVertexId, parallelism, maxParallelism);
+ // Coordinator state
+ operatorState.setCoordinatorState(deserializeStreamStateHandle(dis));
+
// Sub task states
final int numSubTaskStates = dis.readInt();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f8d79fa..09a6a3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
@@ -60,10 +61,12 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.scheduler.InternalFailuresListener;
import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
@@ -457,6 +460,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);
+ final Collection<OperatorCoordinatorCheckpointContext> operatorCoordinators = buildOpCoordinatorCheckpointContexts();
+
checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
CheckpointFailureManager failureManager = new CheckpointFailureManager(
@@ -487,6 +492,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
+ operatorCoordinators,
checkpointIDCounter,
checkpointStore,
checkpointStateBackend,
@@ -566,6 +572,21 @@ public class ExecutionGraph implements AccessExecutionGraph {
}
}
+ private Collection<OperatorCoordinatorCheckpointContext> buildOpCoordinatorCheckpointContexts() {
+ final ArrayList<OperatorCoordinatorCheckpointContext> contexts = new ArrayList<>();
+ for (final ExecutionJobVertex vertex : verticesInCreationOrder) {
+ for (final Map.Entry<OperatorID, OperatorCoordinator> coordinator : vertex.getOperatorCoordinatorMap().entrySet()) {
+ contexts.add(new OperatorCoordinatorCheckpointContext(
+ coordinator.getValue(),
+ coordinator.getKey(),
+ vertex.getMaxParallelism(),
+ vertex.getParallelism()));
+ }
+ }
+ contexts.trimToSize();
+ return contexts;
+ }
+
// --------------------------------------------------------------------------------------------
// Properties and Status of the Execution Graph
// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 4b60652..44d33fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -249,10 +249,12 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
}
try {
- this.operatorCoordinators = OperatorCoordinatorUtil.instantiateCoordinators(
+ final Map<OperatorID, OperatorCoordinator> coordinators = OperatorCoordinatorUtil.instantiateCoordinators(
jobVertex.getOperatorCoordinators(),
graph.getUserClassLoader(),
(opId) -> new ExecutionJobVertexCoordinatorContext(opId, this));
+
+ this.operatorCoordinators = Collections.unmodifiableMap(coordinators);
}
catch (IOException | ClassNotFoundException e) {
throw new JobException("Cannot instantiate the coordinator for operator " + getName(), e);
@@ -402,8 +404,12 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return operatorCoordinators.get(operatorId);
}
+ public Map<OperatorID, OperatorCoordinator> getOperatorCoordinatorMap() {
+ return operatorCoordinators;
+ }
+
public Collection<OperatorCoordinator> getOperatorCoordinators() {
- return Collections.unmodifiableCollection(operatorCoordinators.values());
+ return operatorCoordinators.values();
}
public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index b5951c2..eaa24ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -67,7 +67,18 @@ public interface OperatorCoordinator extends AutoCloseable {
CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
- void checkpointComplete(long checkpointId) throws Exception;
+ /**
+ * Notifies the coordinator that the checkpoint with the given checkpointId completes and
+ * was committed.
+ *
+ * <p><b>Important:</b> This method is not supposed to throw an exception, because by the
+ * time we notify that the checkpoint is complete, the checkpoint is committed and cannot be
+ * aborted any more. If the coordinator gets into an inconsistent state internally, it should
+ * fail the job ({@link Context#failJob(Throwable)}) instead. Any exception propagating from
+ * this method may be treated as a fatal error for the JobManager, crashing the JobManager,
+ * and leading to an expensive "master failover" procedure.
+ */
+ void checkpointComplete(long checkpointId);
void resetToCheckpoint(byte[] checkpointData) throws Exception;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index e5ea59a..cff2571 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -460,6 +460,7 @@ public class CheckpointCoordinatorMasterHooksTest {
new ExecutionVertex[0],
ackVertices,
new ExecutionVertex[0],
+ Collections.emptyList(),
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(10),
new MemoryStateBackend(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 14ff439..cf36f20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -659,6 +659,8 @@ public class CheckpointCoordinatorTestingUtils {
private ExecutionVertex[] tasksToCommitTo;
+ private Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint = Collections.emptyList();
+
private CheckpointIDCounter checkpointIDCounter =
new StandaloneCheckpointIDCounter();
@@ -718,6 +720,10 @@ public class CheckpointCoordinatorTestingUtils {
return this;
}
+ public void setCoordinatorsToCheckpoint(Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint) {
+ this.coordinatorsToCheckpoint = coordinatorsToCheckpoint;
+ }
+
public CheckpointCoordinatorBuilder setCheckpointIDCounter(
CheckpointIDCounter checkpointIDCounter) {
this.checkpointIDCounter = checkpointIDCounter;
@@ -764,6 +770,7 @@ public class CheckpointCoordinatorTestingUtils {
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
+ coordinatorsToCheckpoint,
checkpointIDCounter,
completedCheckpointStore,
checkpointStateBackend,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
index f281ff4..6d7b6cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
@@ -36,6 +36,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import static org.junit.Assert.assertEquals;
@@ -79,6 +80,7 @@ public class FailoverStrategyCheckpointCoordinatorTest extends TestLogger {
new ExecutionVertex[] { executionVertex },
new ExecutionVertex[] { executionVertex },
new ExecutionVertex[] { executionVertex },
+ Collections.emptyList(),
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
new MemoryStateBackend(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index c78c077..f172e51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.StringSerializer;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint.TaskAcknowledgeResult;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -30,10 +31,14 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinator;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestingStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
+import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -46,6 +51,7 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -59,11 +65,12 @@ import java.util.concurrent.ScheduledFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -189,7 +196,6 @@ public class PendingCheckpointTest {
* Tests that abort discards state.
*/
@Test
- @SuppressWarnings("unchecked")
public void testAbortDiscardsState() throws Exception {
CheckpointProperties props = new CheckpointProperties(false, CheckpointType.SAVEPOINT, false, false, false, false, false);
QueueExecutor executor = new QueueExecutor();
@@ -428,21 +434,109 @@ public class PendingCheckpointTest {
assertEquals("state", deserializedState);
}
+ @Test
+ public void testInitiallyUnacknowledgedCoordinatorStates() throws Exception {
+ final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(
+ createOperatorCoordinator(), createOperatorCoordinator());
+
+ assertEquals(2, checkpoint.getNumberOfNonAcknowledgedOperatorCoordinators());
+ assertFalse(checkpoint.isFullyAcknowledged());
+ }
+
+ @Test
+ public void testAcknowledgedCoordinatorStates() throws Exception {
+ final OperatorCoordinatorCheckpointContext coord1 = createOperatorCoordinator();
+ final OperatorCoordinatorCheckpointContext coord2 = createOperatorCoordinator();
+ final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coord1, coord2);
+
+ final TaskAcknowledgeResult ack1 = checkpoint.acknowledgeCoordinatorState(coord1, new TestingStreamStateHandle());
+ final TaskAcknowledgeResult ack2 = checkpoint.acknowledgeCoordinatorState(coord2, null);
+
+ assertEquals(TaskAcknowledgeResult.SUCCESS, ack1);
+ assertEquals(TaskAcknowledgeResult.SUCCESS, ack2);
+ assertEquals(0, checkpoint.getNumberOfNonAcknowledgedOperatorCoordinators());
+ assertTrue(checkpoint.isFullyAcknowledged());
+ assertThat(checkpoint.getOperatorStates().keySet(), Matchers.contains(coord1.operatorId()));
+ }
+
+ @Test
+ public void testDuplicateAcknowledgeCoordinator() throws Exception {
+ final OperatorCoordinatorCheckpointContext coordinator = createOperatorCoordinator();
+ final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coordinator);
+
+ checkpoint.acknowledgeCoordinatorState(coordinator, new TestingStreamStateHandle());
+ final TaskAcknowledgeResult secondAck = checkpoint.acknowledgeCoordinatorState(coordinator, null);
+
+ assertEquals(TaskAcknowledgeResult.DUPLICATE, secondAck);
+ }
+
+ @Test
+ public void testAcknowledgeUnknownCoordinator() throws Exception {
+ final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(createOperatorCoordinator());
+
+ final TaskAcknowledgeResult ack = checkpoint.acknowledgeCoordinatorState(createOperatorCoordinator(), null);
+
+ assertEquals(TaskAcknowledgeResult.UNKNOWN, ack);
+ }
+
+ @Test
+ public void testDisposeDisposesCoordinatorStates() throws Exception {
+ final TestingStreamStateHandle handle1 = new TestingStreamStateHandle();
+ final TestingStreamStateHandle handle2 = new TestingStreamStateHandle();
+ final PendingCheckpoint checkpoint = createPendingCheckpointWithAcknowledgedCoordinators(handle1, handle2);
+
+ checkpoint.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED);
+
+ assertTrue(handle1.isDisposed());
+ assertTrue(handle2.isDisposed());
+ }
+
// ------------------------------------------------------------------------
private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props) throws IOException {
- return createPendingCheckpoint(props, Collections.emptyList(), Executors.directExecutor());
+ return createPendingCheckpoint(props, Collections.emptyList(), Collections.emptyList(), Executors.directExecutor());
}
private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Executor executor) throws IOException {
- return createPendingCheckpoint(props, Collections.emptyList(), executor);
+ return createPendingCheckpoint(props, Collections.emptyList(), Collections.emptyList(), executor);
}
private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Collection<String> masterStateIdentifiers) throws IOException {
- return createPendingCheckpoint(props, masterStateIdentifiers, Executors.directExecutor());
+ return createPendingCheckpoint(props, Collections.emptyList(), masterStateIdentifiers, Executors.directExecutor());
}
- private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Collection<String> masterStateIdentifiers, Executor executor) throws IOException {
+ private PendingCheckpoint createPendingCheckpointWithCoordinators(
+ OperatorCoordinatorCheckpointContext... coordinators) throws IOException {
+
+ final PendingCheckpoint checkpoint = createPendingCheckpoint(
+ CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ OperatorCoordinatorCheckpointContext.getIds(Arrays.asList(coordinators)),
+ Collections.emptyList(),
+ Executors.directExecutor());
+
+ checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+ return checkpoint;
+ }
+
+ private PendingCheckpoint createPendingCheckpointWithAcknowledgedCoordinators(StreamStateHandle... handles) throws IOException {
+ OperatorCoordinatorCheckpointContext[] coords = new OperatorCoordinatorCheckpointContext[handles.length];
+ for (int i = 0; i < handles.length; i++) {
+ coords[i] = createOperatorCoordinator();
+ }
+
+ final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coords);
+ for (int i = 0; i < handles.length; i++) {
+ checkpoint.acknowledgeCoordinatorState(coords[i], handles[i]);
+ }
+
+ return checkpoint;
+ }
+
+ private PendingCheckpoint createPendingCheckpoint(
+ CheckpointProperties props,
+ Collection<OperatorID> operatorCoordinators,
+ Collection<String> masterStateIdentifiers,
+ Executor executor) throws IOException {
final Path checkpointDir = new Path(tmpFolder.newFolder().toURI());
final FsCheckpointStorageLocation location = new FsCheckpointStorageLocation(
@@ -459,6 +553,7 @@ public class PendingCheckpointTest {
0,
1,
ackTasks,
+ operatorCoordinators,
masterStateIdentifiers,
props,
location,
@@ -466,6 +561,14 @@ public class PendingCheckpointTest {
new CompletableFuture<>());
}
+ private static OperatorCoordinatorCheckpointContext createOperatorCoordinator() {
+ return new OperatorCoordinatorCheckpointContext(
+ new MockOperatorCoordinator(),
+ new OperatorID(),
+ 256,
+ 50);
+ }
+
@SuppressWarnings("unchecked")
static void setTaskState(PendingCheckpoint pending, OperatorState state) throws NoSuchFieldException, IllegalAccessException {
Field field = PendingCheckpoint.class.getDeclaredField("operatorStates");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
index f411a39..ef6ea85 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
@@ -66,6 +66,12 @@ public class CheckpointTestUtils {
OperatorState taskState = new OperatorState(new OperatorID(), numSubtasksPerTask, 128);
+ final boolean hasCoordinatorState = random.nextBoolean();
+ if (hasCoordinatorState) {
+ final StreamStateHandle stateHandle = createDummyStreamStateHandle(random);
+ taskState.setCoordinatorState(stateHandle);
+ }
+
boolean hasOperatorStateBackend = random.nextBoolean();
boolean hasOperatorStateStream = random.nextBoolean();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
new file mode 100644
index 0000000..c4122fc
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An empty interface implementation of the {@link OperatorCoordinator}.
+ * If you need a testing stub, use the {@link TestingOperatorCoordinator} instead.
+ */
+public final class MockOperatorCoordinator implements OperatorCoordinator {
+
+ @Override
+ public void start() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void handleEventFromOperator(int subtask, OperatorEvent event) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void subtaskFailed(int subtask) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void checkpointComplete(long checkpointId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void resetToCheckpoint(byte[] checkpointData) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingStreamStateHandle.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingStreamStateHandle.java
new file mode 100644
index 0000000..7a968d2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingStreamStateHandle.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import javax.annotation.Nullable;
+
+/**
+ * A simple test mock for a {@link StreamStateHandle}.
+ */
+public class TestingStreamStateHandle implements StreamStateHandle {
+ private static final long serialVersionUID = 1L;
+
+ @Nullable
+ private final FSDataInputStream inputStream;
+
+ private final long size;
+
+ private boolean disposed;
+
+ public TestingStreamStateHandle() {
+ this(null, 0L);
+ }
+
+ public TestingStreamStateHandle(@Nullable FSDataInputStream inputStream, long size) {
+ this.inputStream = inputStream;
+ this.size = size;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public FSDataInputStream openInputStream() {
+ if (inputStream == null) {
+ throw new UnsupportedOperationException("no input stream provided");
+ }
+ return inputStream;
+ }
+
+ @Override
+ public void discardState() {
+ disposed = true;
+ }
+
+ @Override
+ public long getStateSize() {
+ return size;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public boolean isDisposed() {
+ return disposed;
+ }
+}