You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/30 21:15:08 UTC

[flink] 05/11: [FLINK-16986][coordination][refactor] Reduce dependencies of OperatorCoordinatorHolder and OperatorCoordinatorCheckpointContext

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b233aa84a1b0d8354211506219d0f785bccae870
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 28 18:18:41 2020 +0200

    [FLINK-16986][coordination][refactor] Reduce dependencies of OperatorCoordinatorHolder and OperatorCoordinatorCheckpointContext
    
    This simplifies both testing and future refactoring.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |   7 +-
 .../OperatorCoordinatorCheckpointContext.java      |  41 +----
 .../checkpoint/OperatorCoordinatorCheckpoints.java |  20 +--
 .../runtime/checkpoint/PendingCheckpoint.java      |   3 +-
 .../coordination/OperatorCoordinatorHolder.java    | 172 +++++++++++++--------
 .../coordination/OperatorInfo.java}                |  30 +---
 .../runtime/checkpoint/PendingCheckpointTest.java  |  27 ++--
 ...kpointContext.java => TestingOperatorInfo.java} |  31 +---
 8 files changed, 151 insertions(+), 180 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 51b98f5..da518ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
@@ -626,7 +627,7 @@ public class CheckpointCoordinator {
 			checkpointID,
 			timestamp,
 			ackTasks,
-			OperatorCoordinatorCheckpointContext.getIds(coordinatorsToCheckpoint),
+			OperatorInfo.getIds(coordinatorsToCheckpoint),
 			masterHooks.keySet(),
 			props,
 			checkpointStorageLocation,
@@ -1074,7 +1075,7 @@ public class CheckpointCoordinator {
 
 		// commit coordinators
 		for (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) {
-			coordinatorContext.coordinator().checkpointComplete(checkpointId);
+			coordinatorContext.checkpointComplete(checkpointId);
 		}
 	}
 
@@ -1496,7 +1497,7 @@ public class CheckpointCoordinator {
 
 			final ByteStreamStateHandle coordinatorState = state.getCoordinatorState();
 			if (coordinatorState != null) {
-				coordContext.coordinator().resetToCheckpoint(coordinatorState.getData());
+				coordContext.resetToCheckpoint(coordinatorState.getData());
 			}
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
index 92fd2aa..abc15b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -18,49 +18,24 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 
-import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.concurrent.CompletableFuture;
 
 /**
- * An {@link OperatorCoordinator} and its contextual information needed to trigger and
- * acknowledge a checkpoint.
+ * This context is the interface through which the {@link CheckpointCoordinator} interacts with an
+ * {@link OperatorCoordinator} during checkpointing and checkpoint restoring.
  */
-public interface OperatorCoordinatorCheckpointContext {
+public interface OperatorCoordinatorCheckpointContext extends OperatorInfo {
 
-	// ------------------------------------------------------------------------
-	//  properties
-	// ------------------------------------------------------------------------
-
-	OperatorCoordinator coordinator();
-
-	OperatorID operatorId();
-
-	int maxParallelism();
-
-	int currentParallelism();
-
-	// ------------------------------------------------------------------------
-	//  checkpoint triggering callbacks
-	// ------------------------------------------------------------------------
-
-	void onCallTriggerCheckpoint(long checkpointId);
-
-	void onCheckpointStateFutureComplete(long checkpointId);
+	CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
 
 	void afterSourceBarrierInjection(long checkpointId);
 
 	void abortCurrentTriggering();
 
-	// ------------------------------------------------------------------------
-	//  utils
-	// ------------------------------------------------------------------------
+	void checkpointComplete(long checkpointId);
 
-	static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
-		return infos.stream()
-			.map(OperatorCoordinatorCheckpointContext::operatorId)
-			.collect(Collectors.toList());
-	}
+	void resetToCheckpoint(byte[] checkpointData) throws Exception;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
index 68fc8f1..6215123 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 import java.util.ArrayList;
@@ -39,15 +40,15 @@ import java.util.concurrent.Executor;
 final class OperatorCoordinatorCheckpoints {
 
 	public static CompletableFuture<CoordinatorSnapshot> triggerCoordinatorCheckpoint(
-			final OperatorCoordinatorCheckpointContext coordinatorInfo,
+			final OperatorCoordinatorCheckpointContext coordinatorContext,
 			final long checkpointId) throws Exception {
 
 		final CompletableFuture<byte[]> checkpointFuture =
-				coordinatorInfo.coordinator().checkpointCoordinator(checkpointId);
+			coordinatorContext.checkpointCoordinator(checkpointId);
 
 		return checkpointFuture.thenApply(
 				(state) -> new CoordinatorSnapshot(
-						coordinatorInfo, new ByteStreamStateHandle(coordinatorInfo.operatorId().toString(), state))
+						coordinatorContext, new ByteStreamStateHandle(coordinatorContext.operatorId().toString(), state))
 		);
 	}
 
@@ -59,16 +60,7 @@ final class OperatorCoordinatorCheckpoints {
 
 		for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) {
 			final CompletableFuture<CoordinatorSnapshot> checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId);
-			coordinator.onCallTriggerCheckpoint(checkpointId);
-
 			individualSnapshots.add(checkpointFuture);
-			checkpointFuture.whenComplete((ignored, failure) -> {
-				if (failure != null) {
-					coordinator.abortCurrentTriggering();
-				} else {
-					coordinator.onCheckpointStateFutureComplete(checkpointId);
-				}
-			});
 		}
 
 		return FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new);
@@ -144,10 +136,10 @@ final class OperatorCoordinatorCheckpoints {
 
 	static final class CoordinatorSnapshot {
 
-		final OperatorCoordinatorCheckpointContext coordinator;
+		final OperatorInfo coordinator;
 		final ByteStreamStateHandle state;
 
-		CoordinatorSnapshot(OperatorCoordinatorCheckpointContext coordinator, ByteStreamStateHandle state) {
+		CoordinatorSnapshot(OperatorInfo coordinator, ByteStreamStateHandle state) {
 			this.coordinator = coordinator;
 			this.state = state;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 2a0eba7..376301d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
@@ -431,7 +432,7 @@ public class PendingCheckpoint {
 	}
 
 	public TaskAcknowledgeResult acknowledgeCoordinatorState(
-			OperatorCoordinatorCheckpointContext coordinatorInfo,
+			OperatorInfo coordinatorInfo,
 			@Nullable ByteStreamStateHandle stateHandle) {
 
 		synchronized (lock) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index c70e229..304eb20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -35,6 +36,7 @@ import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -67,7 +69,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 	private final int operatorParallelism;
 	private final int operatorMaxParallelism;
 
-	private long currentlyTriggeredCheckpoint;
+	private volatile long currentlyTriggeredCheckpoint;
 
 	private OperatorCoordinatorHolder(
 			final OperatorID operatorId,
@@ -88,21 +90,25 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 	}
 
 	public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
-		context.lazyInitialize(scheduler, schedulerExecutor);
+		lazyInitialize(scheduler::handleGlobalFailure, schedulerExecutor);
+	}
+
+	@VisibleForTesting
+	void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) {
+		context.lazyInitialize(globalFailureHandler, schedulerExecutor);
 	}
 
 	// ------------------------------------------------------------------------
 	//  Properties
 	// ------------------------------------------------------------------------
 
-	@Override
-	public OperatorID operatorId() {
-		return operatorId;
+	public OperatorCoordinator coordinator() {
+		return coordinator;
 	}
 
 	@Override
-	public OperatorCoordinator coordinator() {
-		return coordinator;
+	public OperatorID operatorId() {
+		return operatorId;
 	}
 
 	@Override
@@ -116,39 +122,6 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 	}
 
 	// ------------------------------------------------------------------------
-	//  Checkpointing Callbacks
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void onCallTriggerCheckpoint(long checkpointId) {
-		checkCheckpointAlreadyHappening(checkpointId);
-		currentlyTriggeredCheckpoint = checkpointId;
-	}
-
-	@Override
-	public void onCheckpointStateFutureComplete(long checkpointId) {
-		checkCheckpointAlreadyHappening(checkpointId);
-		eventValve.shutValve();
-	}
-
-	@Override
-	public void afterSourceBarrierInjection(long checkpointId) {
-		checkCheckpointAlreadyHappening(checkpointId);
-		eventValve.openValve();
-		currentlyTriggeredCheckpoint = NO_CHECKPOINT;
-	}
-
-	@Override
-	public void abortCurrentTriggering() {
-		eventValve.openValve();
-		currentlyTriggeredCheckpoint = NO_CHECKPOINT;
-	}
-
-	private void checkCheckpointAlreadyHappening(long checkpointId) {
-		checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId);
-	}
-
-	// ------------------------------------------------------------------------
 	//  OperatorCoordinator Interface
 	// ------------------------------------------------------------------------
 
@@ -177,7 +150,20 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 
 	@Override
 	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
-		return coordinator.checkpointCoordinator(checkpointId);
+		setCurrentlyTriggeredCheckpoint(checkpointId);
+
+		final CompletableFuture<byte[]> checkpointFuture = coordinator.checkpointCoordinator(checkpointId);
+
+		// synchronously!!!, with the completion, we need to shut the event valve
+		checkpointFuture.whenComplete((ignored, failure) -> {
+			if (failure != null) {
+				abortCurrentTriggering();
+			} else {
+				onCheckpointStateFutureComplete(checkpointId);
+			}
+		});
+
+		return checkpointFuture;
 	}
 
 	@Override
@@ -187,10 +173,47 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 
 	@Override
 	public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+		resetCheckpointTriggeringCheck();
+		eventValve.reset();
 		coordinator.resetToCheckpoint(checkpointData);
 	}
 
 	// ------------------------------------------------------------------------
+	//  Checkpointing Callbacks
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void afterSourceBarrierInjection(long checkpointId) {
+		verifyNoOtherCheckpointBeingTriggered(checkpointId);
+		eventValve.openValve();
+		resetCheckpointTriggeringCheck();
+	}
+
+	@Override
+	public void abortCurrentTriggering() {
+		eventValve.openValve();
+		resetCheckpointTriggeringCheck();
+	}
+
+	void onCheckpointStateFutureComplete(long checkpointId) {
+		verifyNoOtherCheckpointBeingTriggered(checkpointId);
+		eventValve.shutValve();
+	}
+
+	private void verifyNoOtherCheckpointBeingTriggered(long checkpointId) {
+		checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId);
+	}
+
+	private void setCurrentlyTriggeredCheckpoint(long checkpointId) {
+		verifyNoOtherCheckpointBeingTriggered(checkpointId);
+		currentlyTriggeredCheckpoint = checkpointId;
+	}
+
+	private void resetCheckpointTriggeringCheck() {
+		currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+	}
+
+	// ------------------------------------------------------------------------
 	//  Factories
 	// ------------------------------------------------------------------------
 
@@ -209,20 +232,41 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 					return executionAttempt.sendOperatorEvent(opId, serializedEvent);
 				};
 
-			final OperatorEventValve valve = new OperatorEventValve(eventSender);
-			final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, jobVertex, valve);
-			final OperatorCoordinator coordinator = provider.create(context);
-
-			return new OperatorCoordinatorHolder(
+			return create(
 					opId,
-					coordinator,
-					context,
-					valve,
+					provider,
+					eventSender,
+					jobVertex.getName(),
 					jobVertex.getParallelism(),
 					jobVertex.getMaxParallelism());
 		}
 	}
 
+	@VisibleForTesting
+	static OperatorCoordinatorHolder create(
+			final OperatorID opId,
+			final OperatorCoordinator.Provider coordinatorProvider,
+			final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender,
+			final String operatorName,
+			final int operatorParallelism,
+			final int operatorMaxParallelism) {
+
+		final OperatorEventValve valve = new OperatorEventValve(eventSender);
+
+		final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(
+				opId, valve, operatorName, operatorParallelism);
+
+		final OperatorCoordinator coordinator = coordinatorProvider.create(context);
+
+		return new OperatorCoordinatorHolder(
+				opId,
+				coordinator,
+				context,
+				valve,
+				operatorParallelism,
+				operatorMaxParallelism);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Nested Classes
 	// ------------------------------------------------------------------------
@@ -239,33 +283,36 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 	private static final class LazyInitializedCoordinatorContext implements OperatorCoordinator.Context {
 
 		private final OperatorID operatorId;
-		private final ExecutionJobVertex jobVertex;
 		private final OperatorEventValve eventValve;
+		private final String operatorName;
+		private final int operatorParallelism;
 
-		private SchedulerNG scheduler;
+		private Consumer<Throwable> globalFailureHandler;
 		private Executor schedulerExecutor;
 
 		public LazyInitializedCoordinatorContext(
-				OperatorID operatorId,
-				ExecutionJobVertex jobVertex,
-				OperatorEventValve eventValve) {
+				final OperatorID operatorId,
+				final OperatorEventValve eventValve,
+				final String operatorName,
+				final int operatorParallelism) {
 			this.operatorId = checkNotNull(operatorId);
-			this.jobVertex = checkNotNull(jobVertex);
 			this.eventValve = checkNotNull(eventValve);
+			this.operatorName = checkNotNull(operatorName);
+			this.operatorParallelism = operatorParallelism;
 		}
 
-		void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
-			this.scheduler = checkNotNull(scheduler);
+		void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) {
+			this.globalFailureHandler = checkNotNull(globalFailureHandler);
 			this.schedulerExecutor = checkNotNull(schedulerExecutor);
 		}
 
 		void unInitialize() {
-			this.scheduler = null;
+			this.globalFailureHandler = null;
 			this.schedulerExecutor = null;
 		}
 
 		boolean isInitialized() {
-			return jobVertex != null;
+			return schedulerExecutor != null;
 		}
 
 		private void checkInitialized() {
@@ -309,15 +356,14 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
 			checkInitialized();
 
 			final FlinkException e = new FlinkException("Global failure triggered by OperatorCoordinator for '" +
-				jobVertex.getName() + "' (operator " + operatorId + ").", cause);
+				operatorName + "' (operator " + operatorId + ").", cause);
 
-			schedulerExecutor.execute(() -> scheduler.handleGlobalFailure(e));
+			schedulerExecutor.execute(() -> globalFailureHandler.accept(e));
 		}
 
 		@Override
 		public int currentParallelism() {
-			checkInitialized();
-			return jobVertex.getParallelism();
+			return operatorParallelism;
 		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java
similarity index 54%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java
index 92fd2aa..a146940 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java
@@ -16,25 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.checkpoint;
+package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 
 import java.util.Collection;
 import java.util.stream.Collectors;
 
 /**
- * An {@link OperatorCoordinator} and its contextual information needed to trigger and
- * acknowledge a checkpoint.
+ * An interface to access basic properties of an operator in the context of its coordinator.
  */
-public interface OperatorCoordinatorCheckpointContext {
-
-	// ------------------------------------------------------------------------
-	//  properties
-	// ------------------------------------------------------------------------
-
-	OperatorCoordinator coordinator();
+public interface OperatorInfo {
 
 	OperatorID operatorId();
 
@@ -43,24 +35,12 @@ public interface OperatorCoordinatorCheckpointContext {
 	int currentParallelism();
 
 	// ------------------------------------------------------------------------
-	//  checkpoint triggering callbacks
-	// ------------------------------------------------------------------------
-
-	void onCallTriggerCheckpoint(long checkpointId);
-
-	void onCheckpointStateFutureComplete(long checkpointId);
-
-	void afterSourceBarrierInjection(long checkpointId);
-
-	void abortCurrentTriggering();
-
-	// ------------------------------------------------------------------------
 	//  utils
 	// ------------------------------------------------------------------------
 
-	static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
+	static Collection<OperatorID> getIds(Collection<? extends OperatorInfo> infos) {
 		return infos.stream()
-			.map(OperatorCoordinatorCheckpointContext::operatorId)
+			.map(OperatorInfo::operatorId)
 			.collect(Collectors.toList());
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index b454633..30c1dc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -32,7 +32,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCheckpointContext;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
+import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.TestingStreamStateHandle;
@@ -438,7 +439,7 @@ public class PendingCheckpointTest {
 	@Test
 	public void testInitiallyUnacknowledgedCoordinatorStates() throws Exception {
 		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(
-				createOperatorCoordinator(), createOperatorCoordinator());
+				new TestingOperatorInfo(), new TestingOperatorInfo());
 
 		assertEquals(2, checkpoint.getNumberOfNonAcknowledgedOperatorCoordinators());
 		assertFalse(checkpoint.isFullyAcknowledged());
@@ -446,8 +447,8 @@ public class PendingCheckpointTest {
 
 	@Test
 	public void testAcknowledgedCoordinatorStates() throws Exception {
-		final OperatorCoordinatorCheckpointContext coord1 = createOperatorCoordinator();
-		final OperatorCoordinatorCheckpointContext coord2 = createOperatorCoordinator();
+		final OperatorInfo coord1 = new TestingOperatorInfo();
+		final OperatorInfo coord2 = new TestingOperatorInfo();
 		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coord1, coord2);
 
 		final TaskAcknowledgeResult ack1 = checkpoint.acknowledgeCoordinatorState(coord1, new TestingStreamStateHandle());
@@ -462,7 +463,7 @@ public class PendingCheckpointTest {
 
 	@Test
 	public void testDuplicateAcknowledgeCoordinator() throws Exception {
-		final OperatorCoordinatorCheckpointContext coordinator = createOperatorCoordinator();
+		final OperatorInfo coordinator = new TestingOperatorInfo();
 		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coordinator);
 
 		checkpoint.acknowledgeCoordinatorState(coordinator, new TestingStreamStateHandle());
@@ -473,9 +474,9 @@ public class PendingCheckpointTest {
 
 	@Test
 	public void testAcknowledgeUnknownCoordinator() throws Exception {
-		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(createOperatorCoordinator());
+		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(new TestingOperatorInfo());
 
-		final TaskAcknowledgeResult ack = checkpoint.acknowledgeCoordinatorState(createOperatorCoordinator(), null);
+		final TaskAcknowledgeResult ack = checkpoint.acknowledgeCoordinatorState(new TestingOperatorInfo(), null);
 
 		assertEquals(TaskAcknowledgeResult.UNKNOWN, ack);
 	}
@@ -507,11 +508,11 @@ public class PendingCheckpointTest {
 	}
 
 	private PendingCheckpoint createPendingCheckpointWithCoordinators(
-			OperatorCoordinatorCheckpointContext... coordinators) throws IOException {
+				OperatorInfo... coordinators) throws IOException {
 
 		final PendingCheckpoint checkpoint = createPendingCheckpoint(
 				CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-				OperatorCoordinatorCheckpointContext.getIds(Arrays.asList(coordinators)),
+				OperatorInfo.getIds(Arrays.asList(coordinators)),
 				Collections.emptyList(),
 				Executors.directExecutor());
 
@@ -520,9 +521,9 @@ public class PendingCheckpointTest {
 	}
 
 	private PendingCheckpoint createPendingCheckpointWithAcknowledgedCoordinators(ByteStreamStateHandle... handles) throws IOException {
-		OperatorCoordinatorCheckpointContext[] coords = new OperatorCoordinatorCheckpointContext[handles.length];
+		final OperatorInfo[] coords = new OperatorInfo[handles.length];
 		for (int i = 0; i < handles.length; i++) {
-			coords[i] = createOperatorCoordinator();
+			coords[i] = new TestingOperatorInfo();
 		}
 
 		final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coords);
@@ -562,10 +563,6 @@ public class PendingCheckpointTest {
 			new CompletableFuture<>());
 	}
 
-	private static OperatorCoordinatorCheckpointContext createOperatorCoordinator() {
-		return new MockOperatorCoordinatorCheckpointContext();
-	}
-
 	@SuppressWarnings("unchecked")
 	static void setTaskState(PendingCheckpoint pending, OperatorState state) throws NoSuchFieldException, IllegalAccessException {
 		Field field = PendingCheckpoint.class.getDeclaredField("operatorStates");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java
similarity index 60%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java
index 7cab82f..f8d4a1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java
@@ -18,40 +18,31 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
-import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 
 /**
- * A testing mock implementation of the {@link OperatorCoordinatorCheckpointContext}.
+ * A testing implementation of the {@link OperatorInfo}.
  */
-public class MockOperatorCoordinatorCheckpointContext implements OperatorCoordinatorCheckpointContext {
+public class TestingOperatorInfo implements OperatorInfo {
 
 	private final OperatorID operatorId;
-	private final OperatorCoordinator coordinator;
 	private final int parallelism;
 	private final int maxParallelism;
 
-	public MockOperatorCoordinatorCheckpointContext() {
-		this(new OperatorID(), new MockOperatorCoordinator(), 50, 256);
+	public TestingOperatorInfo() {
+		this(new OperatorID(), 50, 256);
 	}
 
-	public MockOperatorCoordinatorCheckpointContext(
+	public TestingOperatorInfo(
 			OperatorID operatorId,
-			OperatorCoordinator coordinator,
 			int parallelism,
 			int maxParallelism) {
 		this.operatorId = operatorId;
-		this.coordinator = coordinator;
 		this.parallelism = parallelism;
 		this.maxParallelism = maxParallelism;
 	}
 
 	@Override
-	public OperatorCoordinator coordinator() {
-		return coordinator;
-	}
-
-	@Override
 	public OperatorID operatorId() {
 		return operatorId;
 	}
@@ -65,16 +56,4 @@ public class MockOperatorCoordinatorCheckpointContext implements OperatorCoordin
 	public int currentParallelism() {
 		return parallelism;
 	}
-
-	@Override
-	public void onCallTriggerCheckpoint(long checkpointId) {}
-
-	@Override
-	public void onCheckpointStateFutureComplete(long checkpointId) {}
-
-	@Override
-	public void afterSourceBarrierInjection(long checkpointId) {}
-
-	@Override
-	public void abortCurrentTriggering() {}
 }