You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/30 21:15:08 UTC
[flink] 05/11: [FLINK-16986][coordination][refactor] Reduce
dependencies of OperatorCoordinatorHolder and
OperatorCoordinatorCheckpointContext
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b233aa84a1b0d8354211506219d0f785bccae870
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 28 18:18:41 2020 +0200
[FLINK-16986][coordination][refactor] Reduce dependencies of OperatorCoordinatorHolder and OperatorCoordinatorCheckpointContext
This simplifies both testing and future refactoring.
---
.../runtime/checkpoint/CheckpointCoordinator.java | 7 +-
.../OperatorCoordinatorCheckpointContext.java | 41 +----
.../checkpoint/OperatorCoordinatorCheckpoints.java | 20 +--
.../runtime/checkpoint/PendingCheckpoint.java | 3 +-
.../coordination/OperatorCoordinatorHolder.java | 172 +++++++++++++--------
.../coordination/OperatorInfo.java} | 30 +---
.../runtime/checkpoint/PendingCheckpointTest.java | 27 ++--
...kpointContext.java => TestingOperatorInfo.java} | 31 +---
8 files changed, 151 insertions(+), 180 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 51b98f5..da518ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
@@ -626,7 +627,7 @@ public class CheckpointCoordinator {
checkpointID,
timestamp,
ackTasks,
- OperatorCoordinatorCheckpointContext.getIds(coordinatorsToCheckpoint),
+ OperatorInfo.getIds(coordinatorsToCheckpoint),
masterHooks.keySet(),
props,
checkpointStorageLocation,
@@ -1074,7 +1075,7 @@ public class CheckpointCoordinator {
// commit coordinators
for (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) {
- coordinatorContext.coordinator().checkpointComplete(checkpointId);
+ coordinatorContext.checkpointComplete(checkpointId);
}
}
@@ -1496,7 +1497,7 @@ public class CheckpointCoordinator {
final ByteStreamStateHandle coordinatorState = state.getCoordinatorState();
if (coordinatorState != null) {
- coordContext.coordinator().resetToCheckpoint(coordinatorState.getData());
+ coordContext.resetToCheckpoint(coordinatorState.getData());
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
index 92fd2aa..abc15b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -18,49 +18,24 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
-import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.concurrent.CompletableFuture;
/**
- * An {@link OperatorCoordinator} and its contextual information needed to trigger and
- * acknowledge a checkpoint.
+ * This context is the interface through which the {@link CheckpointCoordinator} interacts with an
+ * {@link OperatorCoordinator} during checkpointing and checkpoint restoring.
*/
-public interface OperatorCoordinatorCheckpointContext {
+public interface OperatorCoordinatorCheckpointContext extends OperatorInfo {
- // ------------------------------------------------------------------------
- // properties
- // ------------------------------------------------------------------------
-
- OperatorCoordinator coordinator();
-
- OperatorID operatorId();
-
- int maxParallelism();
-
- int currentParallelism();
-
- // ------------------------------------------------------------------------
- // checkpoint triggering callbacks
- // ------------------------------------------------------------------------
-
- void onCallTriggerCheckpoint(long checkpointId);
-
- void onCheckpointStateFutureComplete(long checkpointId);
+ CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception;
void afterSourceBarrierInjection(long checkpointId);
void abortCurrentTriggering();
- // ------------------------------------------------------------------------
- // utils
- // ------------------------------------------------------------------------
+ void checkpointComplete(long checkpointId);
- static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
- return infos.stream()
- .map(OperatorCoordinatorCheckpointContext::operatorId)
- .collect(Collectors.toList());
- }
+ void resetToCheckpoint(byte[] checkpointData) throws Exception;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
index 68fc8f1..6215123 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import java.util.ArrayList;
@@ -39,15 +40,15 @@ import java.util.concurrent.Executor;
final class OperatorCoordinatorCheckpoints {
public static CompletableFuture<CoordinatorSnapshot> triggerCoordinatorCheckpoint(
- final OperatorCoordinatorCheckpointContext coordinatorInfo,
+ final OperatorCoordinatorCheckpointContext coordinatorContext,
final long checkpointId) throws Exception {
final CompletableFuture<byte[]> checkpointFuture =
- coordinatorInfo.coordinator().checkpointCoordinator(checkpointId);
+ coordinatorContext.checkpointCoordinator(checkpointId);
return checkpointFuture.thenApply(
(state) -> new CoordinatorSnapshot(
- coordinatorInfo, new ByteStreamStateHandle(coordinatorInfo.operatorId().toString(), state))
+ coordinatorContext, new ByteStreamStateHandle(coordinatorContext.operatorId().toString(), state))
);
}
@@ -59,16 +60,7 @@ final class OperatorCoordinatorCheckpoints {
for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) {
final CompletableFuture<CoordinatorSnapshot> checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId);
- coordinator.onCallTriggerCheckpoint(checkpointId);
-
individualSnapshots.add(checkpointFuture);
- checkpointFuture.whenComplete((ignored, failure) -> {
- if (failure != null) {
- coordinator.abortCurrentTriggering();
- } else {
- coordinator.onCheckpointStateFutureComplete(checkpointId);
- }
- });
}
return FutureUtils.combineAll(individualSnapshots).thenApply(AllCoordinatorSnapshots::new);
@@ -144,10 +136,10 @@ final class OperatorCoordinatorCheckpoints {
static final class CoordinatorSnapshot {
- final OperatorCoordinatorCheckpointContext coordinator;
+ final OperatorInfo coordinator;
final ByteStreamStateHandle state;
- CoordinatorSnapshot(OperatorCoordinatorCheckpointContext coordinator, ByteStreamStateHandle state) {
+ CoordinatorSnapshot(OperatorInfo coordinator, ByteStreamStateHandle state) {
this.coordinator = coordinator;
this.state = state;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 2a0eba7..376301d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
@@ -431,7 +432,7 @@ public class PendingCheckpoint {
}
public TaskAcknowledgeResult acknowledgeCoordinatorState(
- OperatorCoordinatorCheckpointContext coordinatorInfo,
+ OperatorInfo coordinatorInfo,
@Nullable ByteStreamStateHandle stateHandle) {
synchronized (lock) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index c70e229..304eb20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -35,6 +36,7 @@ import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -67,7 +69,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
private final int operatorParallelism;
private final int operatorMaxParallelism;
- private long currentlyTriggeredCheckpoint;
+ private volatile long currentlyTriggeredCheckpoint;
private OperatorCoordinatorHolder(
final OperatorID operatorId,
@@ -88,21 +90,25 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
}
public void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
- context.lazyInitialize(scheduler, schedulerExecutor);
+ lazyInitialize(scheduler::handleGlobalFailure, schedulerExecutor);
+ }
+
+ @VisibleForTesting
+ void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) {
+ context.lazyInitialize(globalFailureHandler, schedulerExecutor);
}
// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
- @Override
- public OperatorID operatorId() {
- return operatorId;
+ public OperatorCoordinator coordinator() {
+ return coordinator;
}
@Override
- public OperatorCoordinator coordinator() {
- return coordinator;
+ public OperatorID operatorId() {
+ return operatorId;
}
@Override
@@ -116,39 +122,6 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
}
// ------------------------------------------------------------------------
- // Checkpointing Callbacks
- // ------------------------------------------------------------------------
-
- @Override
- public void onCallTriggerCheckpoint(long checkpointId) {
- checkCheckpointAlreadyHappening(checkpointId);
- currentlyTriggeredCheckpoint = checkpointId;
- }
-
- @Override
- public void onCheckpointStateFutureComplete(long checkpointId) {
- checkCheckpointAlreadyHappening(checkpointId);
- eventValve.shutValve();
- }
-
- @Override
- public void afterSourceBarrierInjection(long checkpointId) {
- checkCheckpointAlreadyHappening(checkpointId);
- eventValve.openValve();
- currentlyTriggeredCheckpoint = NO_CHECKPOINT;
- }
-
- @Override
- public void abortCurrentTriggering() {
- eventValve.openValve();
- currentlyTriggeredCheckpoint = NO_CHECKPOINT;
- }
-
- private void checkCheckpointAlreadyHappening(long checkpointId) {
- checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId);
- }
-
- // ------------------------------------------------------------------------
// OperatorCoordinator Interface
// ------------------------------------------------------------------------
@@ -177,7 +150,20 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
@Override
public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
- return coordinator.checkpointCoordinator(checkpointId);
+ setCurrentlyTriggeredCheckpoint(checkpointId);
+
+ final CompletableFuture<byte[]> checkpointFuture = coordinator.checkpointCoordinator(checkpointId);
+
+ // synchronously!!!, with the completion, we need to shut the event valve
+ checkpointFuture.whenComplete((ignored, failure) -> {
+ if (failure != null) {
+ abortCurrentTriggering();
+ } else {
+ onCheckpointStateFutureComplete(checkpointId);
+ }
+ });
+
+ return checkpointFuture;
}
@Override
@@ -187,10 +173,47 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
@Override
public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+ resetCheckpointTriggeringCheck();
+ eventValve.reset();
coordinator.resetToCheckpoint(checkpointData);
}
// ------------------------------------------------------------------------
+ // Checkpointing Callbacks
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void afterSourceBarrierInjection(long checkpointId) {
+ verifyNoOtherCheckpointBeingTriggered(checkpointId);
+ eventValve.openValve();
+ resetCheckpointTriggeringCheck();
+ }
+
+ @Override
+ public void abortCurrentTriggering() {
+ eventValve.openValve();
+ resetCheckpointTriggeringCheck();
+ }
+
+ void onCheckpointStateFutureComplete(long checkpointId) {
+ verifyNoOtherCheckpointBeingTriggered(checkpointId);
+ eventValve.shutValve();
+ }
+
+ private void verifyNoOtherCheckpointBeingTriggered(long checkpointId) {
+ checkState(currentlyTriggeredCheckpoint == NO_CHECKPOINT || currentlyTriggeredCheckpoint == checkpointId);
+ }
+
+ private void setCurrentlyTriggeredCheckpoint(long checkpointId) {
+ verifyNoOtherCheckpointBeingTriggered(checkpointId);
+ currentlyTriggeredCheckpoint = checkpointId;
+ }
+
+ private void resetCheckpointTriggeringCheck() {
+ currentlyTriggeredCheckpoint = NO_CHECKPOINT;
+ }
+
+ // ------------------------------------------------------------------------
// Factories
// ------------------------------------------------------------------------
@@ -209,20 +232,41 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
return executionAttempt.sendOperatorEvent(opId, serializedEvent);
};
- final OperatorEventValve valve = new OperatorEventValve(eventSender);
- final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(opId, jobVertex, valve);
- final OperatorCoordinator coordinator = provider.create(context);
-
- return new OperatorCoordinatorHolder(
+ return create(
opId,
- coordinator,
- context,
- valve,
+ provider,
+ eventSender,
+ jobVertex.getName(),
jobVertex.getParallelism(),
jobVertex.getMaxParallelism());
}
}
+ @VisibleForTesting
+ static OperatorCoordinatorHolder create(
+ final OperatorID opId,
+ final OperatorCoordinator.Provider coordinatorProvider,
+ final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender,
+ final String operatorName,
+ final int operatorParallelism,
+ final int operatorMaxParallelism) {
+
+ final OperatorEventValve valve = new OperatorEventValve(eventSender);
+
+ final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext(
+ opId, valve, operatorName, operatorParallelism);
+
+ final OperatorCoordinator coordinator = coordinatorProvider.create(context);
+
+ return new OperatorCoordinatorHolder(
+ opId,
+ coordinator,
+ context,
+ valve,
+ operatorParallelism,
+ operatorMaxParallelism);
+ }
+
// ------------------------------------------------------------------------
// Nested Classes
// ------------------------------------------------------------------------
@@ -239,33 +283,36 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
private static final class LazyInitializedCoordinatorContext implements OperatorCoordinator.Context {
private final OperatorID operatorId;
- private final ExecutionJobVertex jobVertex;
private final OperatorEventValve eventValve;
+ private final String operatorName;
+ private final int operatorParallelism;
- private SchedulerNG scheduler;
+ private Consumer<Throwable> globalFailureHandler;
private Executor schedulerExecutor;
public LazyInitializedCoordinatorContext(
- OperatorID operatorId,
- ExecutionJobVertex jobVertex,
- OperatorEventValve eventValve) {
+ final OperatorID operatorId,
+ final OperatorEventValve eventValve,
+ final String operatorName,
+ final int operatorParallelism) {
this.operatorId = checkNotNull(operatorId);
- this.jobVertex = checkNotNull(jobVertex);
this.eventValve = checkNotNull(eventValve);
+ this.operatorName = checkNotNull(operatorName);
+ this.operatorParallelism = operatorParallelism;
}
- void lazyInitialize(SchedulerNG scheduler, Executor schedulerExecutor) {
- this.scheduler = checkNotNull(scheduler);
+ void lazyInitialize(Consumer<Throwable> globalFailureHandler, Executor schedulerExecutor) {
+ this.globalFailureHandler = checkNotNull(globalFailureHandler);
this.schedulerExecutor = checkNotNull(schedulerExecutor);
}
void unInitialize() {
- this.scheduler = null;
+ this.globalFailureHandler = null;
this.schedulerExecutor = null;
}
boolean isInitialized() {
- return jobVertex != null;
+ return schedulerExecutor != null;
}
private void checkInitialized() {
@@ -309,15 +356,14 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC
checkInitialized();
final FlinkException e = new FlinkException("Global failure triggered by OperatorCoordinator for '" +
- jobVertex.getName() + "' (operator " + operatorId + ").", cause);
+ operatorName + "' (operator " + operatorId + ").", cause);
- schedulerExecutor.execute(() -> scheduler.handleGlobalFailure(e));
+ schedulerExecutor.execute(() -> globalFailureHandler.accept(e));
}
@Override
public int currentParallelism() {
- checkInitialized();
- return jobVertex.getParallelism();
+ return operatorParallelism;
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java
similarity index 54%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java
index 92fd2aa..a146940 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorInfo.java
@@ -16,25 +16,17 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.checkpoint;
+package org.apache.flink.runtime.operators.coordination;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import java.util.Collection;
import java.util.stream.Collectors;
/**
- * An {@link OperatorCoordinator} and its contextual information needed to trigger and
- * acknowledge a checkpoint.
+ * An interface to access basic properties of an operator in the context of its coordinator.
*/
-public interface OperatorCoordinatorCheckpointContext {
-
- // ------------------------------------------------------------------------
- // properties
- // ------------------------------------------------------------------------
-
- OperatorCoordinator coordinator();
+public interface OperatorInfo {
OperatorID operatorId();
@@ -43,24 +35,12 @@ public interface OperatorCoordinatorCheckpointContext {
int currentParallelism();
// ------------------------------------------------------------------------
- // checkpoint triggering callbacks
- // ------------------------------------------------------------------------
-
- void onCallTriggerCheckpoint(long checkpointId);
-
- void onCheckpointStateFutureComplete(long checkpointId);
-
- void afterSourceBarrierInjection(long checkpointId);
-
- void abortCurrentTriggering();
-
- // ------------------------------------------------------------------------
// utils
// ------------------------------------------------------------------------
- static Collection<OperatorID> getIds(Collection<OperatorCoordinatorCheckpointContext> infos) {
+ static Collection<OperatorID> getIds(Collection<? extends OperatorInfo> infos) {
return infos.stream()
- .map(OperatorCoordinatorCheckpointContext::operatorId)
+ .map(OperatorInfo::operatorId)
.collect(Collectors.toList());
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index b454633..30c1dc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -32,7 +32,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCheckpointContext;
+import org.apache.flink.runtime.operators.coordination.OperatorInfo;
+import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
@@ -438,7 +439,7 @@ public class PendingCheckpointTest {
@Test
public void testInitiallyUnacknowledgedCoordinatorStates() throws Exception {
final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(
- createOperatorCoordinator(), createOperatorCoordinator());
+ new TestingOperatorInfo(), new TestingOperatorInfo());
assertEquals(2, checkpoint.getNumberOfNonAcknowledgedOperatorCoordinators());
assertFalse(checkpoint.isFullyAcknowledged());
@@ -446,8 +447,8 @@ public class PendingCheckpointTest {
@Test
public void testAcknowledgedCoordinatorStates() throws Exception {
- final OperatorCoordinatorCheckpointContext coord1 = createOperatorCoordinator();
- final OperatorCoordinatorCheckpointContext coord2 = createOperatorCoordinator();
+ final OperatorInfo coord1 = new TestingOperatorInfo();
+ final OperatorInfo coord2 = new TestingOperatorInfo();
final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coord1, coord2);
final TaskAcknowledgeResult ack1 = checkpoint.acknowledgeCoordinatorState(coord1, new TestingStreamStateHandle());
@@ -462,7 +463,7 @@ public class PendingCheckpointTest {
@Test
public void testDuplicateAcknowledgeCoordinator() throws Exception {
- final OperatorCoordinatorCheckpointContext coordinator = createOperatorCoordinator();
+ final OperatorInfo coordinator = new TestingOperatorInfo();
final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coordinator);
checkpoint.acknowledgeCoordinatorState(coordinator, new TestingStreamStateHandle());
@@ -473,9 +474,9 @@ public class PendingCheckpointTest {
@Test
public void testAcknowledgeUnknownCoordinator() throws Exception {
- final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(createOperatorCoordinator());
+ final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(new TestingOperatorInfo());
- final TaskAcknowledgeResult ack = checkpoint.acknowledgeCoordinatorState(createOperatorCoordinator(), null);
+ final TaskAcknowledgeResult ack = checkpoint.acknowledgeCoordinatorState(new TestingOperatorInfo(), null);
assertEquals(TaskAcknowledgeResult.UNKNOWN, ack);
}
@@ -507,11 +508,11 @@ public class PendingCheckpointTest {
}
private PendingCheckpoint createPendingCheckpointWithCoordinators(
- OperatorCoordinatorCheckpointContext... coordinators) throws IOException {
+ OperatorInfo... coordinators) throws IOException {
final PendingCheckpoint checkpoint = createPendingCheckpoint(
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- OperatorCoordinatorCheckpointContext.getIds(Arrays.asList(coordinators)),
+ OperatorInfo.getIds(Arrays.asList(coordinators)),
Collections.emptyList(),
Executors.directExecutor());
@@ -520,9 +521,9 @@ public class PendingCheckpointTest {
}
private PendingCheckpoint createPendingCheckpointWithAcknowledgedCoordinators(ByteStreamStateHandle... handles) throws IOException {
- OperatorCoordinatorCheckpointContext[] coords = new OperatorCoordinatorCheckpointContext[handles.length];
+ final OperatorInfo[] coords = new OperatorInfo[handles.length];
for (int i = 0; i < handles.length; i++) {
- coords[i] = createOperatorCoordinator();
+ coords[i] = new TestingOperatorInfo();
}
final PendingCheckpoint checkpoint = createPendingCheckpointWithCoordinators(coords);
@@ -562,10 +563,6 @@ public class PendingCheckpointTest {
new CompletableFuture<>());
}
- private static OperatorCoordinatorCheckpointContext createOperatorCoordinator() {
- return new MockOperatorCoordinatorCheckpointContext();
- }
-
@SuppressWarnings("unchecked")
static void setTaskState(PendingCheckpoint pending, OperatorState state) throws NoSuchFieldException, IllegalAccessException {
Field field = PendingCheckpoint.class.getDeclaredField("operatorStates");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java
similarity index 60%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java
index 7cab82f..f8d4a1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorCheckpointContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorInfo.java
@@ -18,40 +18,31 @@
package org.apache.flink.runtime.operators.coordination;
-import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.jobgraph.OperatorID;
/**
- * A testing mock implementation of the {@link OperatorCoordinatorCheckpointContext}.
+ * A testing implementation of the {@link OperatorInfo}.
*/
-public class MockOperatorCoordinatorCheckpointContext implements OperatorCoordinatorCheckpointContext {
+public class TestingOperatorInfo implements OperatorInfo {
private final OperatorID operatorId;
- private final OperatorCoordinator coordinator;
private final int parallelism;
private final int maxParallelism;
- public MockOperatorCoordinatorCheckpointContext() {
- this(new OperatorID(), new MockOperatorCoordinator(), 50, 256);
+ public TestingOperatorInfo() {
+ this(new OperatorID(), 50, 256);
}
- public MockOperatorCoordinatorCheckpointContext(
+ public TestingOperatorInfo(
OperatorID operatorId,
- OperatorCoordinator coordinator,
int parallelism,
int maxParallelism) {
this.operatorId = operatorId;
- this.coordinator = coordinator;
this.parallelism = parallelism;
this.maxParallelism = maxParallelism;
}
@Override
- public OperatorCoordinator coordinator() {
- return coordinator;
- }
-
- @Override
public OperatorID operatorId() {
return operatorId;
}
@@ -65,16 +56,4 @@ public class MockOperatorCoordinatorCheckpointContext implements OperatorCoordin
public int currentParallelism() {
return parallelism;
}
-
- @Override
- public void onCallTriggerCheckpoint(long checkpointId) {}
-
- @Override
- public void onCheckpointStateFutureComplete(long checkpointId) {}
-
- @Override
- public void afterSourceBarrierInjection(long checkpointId) {}
-
- @Override
- public void abortCurrentTriggering() {}
}