You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/03/03 09:58:21 UTC
[flink] 01/04: [FLINK-25958][refactor][runtime] Separated the logic of creating and reporting the statistic in order to use it in different place in the future
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1e7d45d53b7ea7b9cfadf2e293ba790f3a9e90c3
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Feb 23 16:41:20 2022 +0100
[FLINK-25958][refactor][runtime] Separated the logic of creating and reporting the statistic in order to use it in different place in the future
---
.../runtime/checkpoint/CheckpointCoordinator.java | 35 ++++----
.../runtime/checkpoint/CheckpointStatsTracker.java | 43 +--------
.../flink/runtime/checkpoint/Checkpoints.java | 3 +-
.../runtime/checkpoint/CompletedCheckpoint.java | 31 ++++---
.../checkpoint/CompletedCheckpointStats.java | 24 +----
.../runtime/checkpoint/FailedCheckpointStats.java | 17 ----
.../runtime/checkpoint/PendingCheckpoint.java | 65 ++++++++------
.../runtime/checkpoint/PendingCheckpointStats.java | 90 +++++++------------
.../CheckpointCoordinatorMasterHooksTest.java | 6 +-
.../CheckpointCoordinatorRestoringTest.java | 9 +-
.../checkpoint/CheckpointCoordinatorTest.java | 3 +-
.../checkpoint/CheckpointStateRestoreTest.java | 6 +-
.../checkpoint/CheckpointStatsTrackerTest.java | 16 ++--
.../checkpoint/CompletedCheckpointStoreTest.java | 3 +-
.../checkpoint/CompletedCheckpointTest.java | 62 +++++++++----
.../DefaultCompletedCheckpointStoreTest.java | 3 +-
.../DefaultCompletedCheckpointStoreUtilsTest.java | 3 +-
.../checkpoint/PendingCheckpointStatsTest.java | 32 ++-----
.../runtime/checkpoint/PendingCheckpointTest.java | 100 +++------------------
.../StandaloneCompletedCheckpointStoreTest.java | 3 +-
.../ZooKeeperCompletedCheckpointStoreTest.java | 3 +-
.../flink/runtime/jobmaster/JobMasterTest.java | 3 +-
.../runtime/scheduler/SchedulerUtilsTest.java | 3 +-
...topWithSavepointTerminationHandlerImplTest.java | 3 +-
24 files changed, 218 insertions(+), 348 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 335ea9a..5248538 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -779,6 +779,9 @@ public class CheckpointCoordinator {
}
}
+ PendingCheckpointStats pendingCheckpointStats =
+ trackPendingCheckpointStats(checkpointID, checkpointPlan, props, timestamp);
+
final PendingCheckpoint checkpoint =
new PendingCheckpoint(
job,
@@ -789,9 +792,8 @@ public class CheckpointCoordinator {
masterHooks.keySet(),
props,
checkpointStorageLocation,
- onCompletionPromise);
-
- trackPendingCheckpointStats(checkpoint);
+ onCompletionPromise,
+ pendingCheckpointStats);
synchronized (lock) {
pendingCheckpoints.put(checkpointID, checkpoint);
@@ -1122,8 +1124,7 @@ public class CheckpointCoordinator {
switch (checkpoint.acknowledgeTask(
message.getTaskExecutionId(),
message.getSubtaskState(),
- message.getCheckpointMetrics(),
- getStatsCallback(checkpoint))) {
+ message.getCheckpointMetrics())) {
case SUCCESS:
LOG.debug(
"Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
@@ -1313,7 +1314,7 @@ public class CheckpointCoordinator {
checkpointsCleaner,
this::scheduleTriggerRequest,
executor,
- getStatsCallback(pendingCheckpoint));
+ statsTracker);
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID());
return completedCheckpoint;
@@ -2041,7 +2042,7 @@ public class CheckpointCoordinator {
checkpointsCleaner,
this::scheduleTriggerRequest,
executor,
- getStatsCallback(pendingCheckpoint));
+ statsTracker);
failureManager.handleCheckpointException(
pendingCheckpoint,
@@ -2180,11 +2181,15 @@ public class CheckpointCoordinator {
SKIP;
}
- private void trackPendingCheckpointStats(PendingCheckpoint checkpoint) {
+ private PendingCheckpointStats trackPendingCheckpointStats(
+ long checkpointId,
+ CheckpointPlan checkpointPlan,
+ CheckpointProperties props,
+ long checkpointTimestamp) {
Map<JobVertexID, Integer> vertices =
Stream.concat(
- checkpoint.getCheckpointPlan().getTasksToWaitFor().stream(),
- checkpoint.getCheckpointPlan().getFinishedTasks().stream())
+ checkpointPlan.getTasksToWaitFor().stream(),
+ checkpointPlan.getFinishedTasks().stream())
.map(Execution::getVertex)
.map(ExecutionVertex::getJobVertex)
.distinct()
@@ -2195,13 +2200,11 @@ public class CheckpointCoordinator {
PendingCheckpointStats pendingCheckpointStats =
statsTracker.reportPendingCheckpoint(
- checkpoint.getCheckpointID(),
- checkpoint.getCheckpointTimestamp(),
- checkpoint.getProps(),
- vertices);
+ checkpointId, checkpointTimestamp, props, vertices);
+
+ reportFinishedTasks(pendingCheckpointStats, checkpointPlan.getFinishedTasks());
- reportFinishedTasks(
- pendingCheckpointStats, checkpoint.getCheckpointPlan().getFinishedTasks());
+ return pendingCheckpointStats;
}
private void reportFinishedTasks(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index c691614..f10a668 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -160,12 +160,7 @@ public class CheckpointStatsTracker {
Map<JobVertexID, Integer> vertexToDop) {
PendingCheckpointStats pending =
- new PendingCheckpointStats(
- checkpointId,
- triggerTimestamp,
- props,
- vertexToDop,
- PendingCheckpointStatsCallback.proxyFor(this));
+ new PendingCheckpointStats(checkpointId, triggerTimestamp, props, vertexToDop);
statsReadWriteLock.lock();
try {
@@ -204,7 +199,7 @@ public class CheckpointStatsTracker {
*
* @param completed The completed checkpoint stats.
*/
- private void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
+ void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
statsReadWriteLock.lock();
try {
latestCompletedCheckpoint = completed;
@@ -225,7 +220,7 @@ public class CheckpointStatsTracker {
*
* @param failed The failed checkpoint stats.
*/
- private void reportFailedCheckpoint(FailedCheckpointStats failed) {
+ void reportFailedCheckpoint(FailedCheckpointStats failed) {
statsReadWriteLock.lock();
try {
counts.incrementFailedCheckpoints();
@@ -276,38 +271,6 @@ public class CheckpointStatsTracker {
}
}
- /** Callback for finalization of a pending checkpoint. */
- interface PendingCheckpointStatsCallback {
- /**
- * Report a completed checkpoint.
- *
- * @param completed The completed checkpoint.
- */
- void reportCompletedCheckpoint(CompletedCheckpointStats completed);
-
- /**
- * Report a failed checkpoint.
- *
- * @param failed The failed checkpoint.
- */
- void reportFailedCheckpoint(FailedCheckpointStats failed);
-
- static PendingCheckpointStatsCallback proxyFor(
- CheckpointStatsTracker checkpointStatsTracker) {
- return new PendingCheckpointStatsCallback() {
- @Override
- public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
- checkpointStatsTracker.reportCompletedCheckpoint(completed);
- }
-
- @Override
- public void reportFailedCheckpoint(FailedCheckpointStats failed) {
- checkpointStatsTracker.reportFailedCheckpoint(failed);
- }
- };
- }
- }
-
// ------------------------------------------------------------------------
// Metrics
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 7d27d50..cc97510 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -215,7 +215,8 @@ public class Checkpoints {
checkpointProperties,
restoreMode == RestoreMode.CLAIM
? new ClaimModeCompletedStorageLocation(location)
- : location);
+ : location,
+ null);
}
private static void throwNonRestoredStateException(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 1972427..841e543 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -104,8 +104,8 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
/** External pointer to the completed checkpoint (for example file path). */
private final String externalPointer;
- /** Optional stats tracker callback for discard. */
- @Nullable private transient volatile CompletedCheckpointStats.DiscardCallback discardCallback;
+ /** Completed statistic for managing discard marker. */
+ @Nullable private final transient CompletedCheckpointStats completedCheckpointStats;
// ------------------------------------------------------------------------
@@ -117,7 +117,8 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
Map<OperatorID, OperatorState> operatorStates,
@Nullable Collection<MasterState> masterHookStates,
CheckpointProperties props,
- CompletedCheckpointStorageLocation storageLocation) {
+ CompletedCheckpointStorageLocation storageLocation,
+ @Nullable CompletedCheckpointStats completedCheckpointStats) {
checkArgument(checkpointID >= 0);
checkArgument(timestamp >= 0);
@@ -140,6 +141,7 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
this.storageLocation = checkNotNull(storageLocation);
this.metadataHandle = storageLocation.getMetadataHandle();
this.externalPointer = storageLocation.getExternalPointer();
+ this.completedCheckpointStats = completedCheckpointStats;
}
// ------------------------------------------------------------------------
@@ -274,14 +276,19 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
} finally {
operatorStates.clear();
- // to be null-pointer safe, copy reference to stack
- CompletedCheckpointStats.DiscardCallback discardCallback = this.discardCallback;
- if (discardCallback != null) {
- discardCallback.notifyDiscardedCheckpoint();
+ if (completedCheckpointStats != null) {
+ completedCheckpointStats.discard();
}
}
}
+ /** NOT Thread safe. This method can be called only from CheckpointCoordinator thread. */
+ public void markAsDiscarded() {
+ if (completedCheckpointStats != null) {
+ completedCheckpointStats.discard();
+ }
+ }
+
public boolean shouldBeDiscardedOnSubsume() {
return props.discardOnSubsumed();
}
@@ -320,13 +327,9 @@ public class CompletedCheckpoint implements Serializable, Checkpoint {
return firstInterestingFields.equals(secondInterestingFields);
}
- /**
- * Sets the callback for tracking when this checkpoint is discarded.
- *
- * @param discardCallback Callback to call when the checkpoint is discarded.
- */
- void setDiscardCallback(@Nullable CompletedCheckpointStats.DiscardCallback discardCallback) {
- this.discardCallback = discardCallback;
+ @Nullable
+ public CompletedCheckpointStats getStatistic() {
+ return completedCheckpointStats;
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
index 38a5c9f..88339ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
@@ -183,28 +183,10 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
}
/**
- * Returns the callback for the {@link CompletedCheckpoint}.
- *
- * @return Callback for the {@link CompletedCheckpoint}.
- */
- DiscardCallback getDiscardCallback() {
- return new DiscardCallback();
- }
-
- /**
- * Callback for the {@link CompletedCheckpoint} instance to notify about disposal of the
- * checkpoint (most commonly when the checkpoint has been subsumed by a newer one).
+ * Mark the checkpoint has been discarded.
*/
- class DiscardCallback {
-
- /**
- * Updates the discarded flag of the checkpoint stats.
- *
- * <p>After this notification, {@link #isDiscarded()} will return <code>true</code>.
- */
- void notifyDiscardedCheckpoint() {
- discarded = true;
- }
+ void discard() {
+ discarded = true;
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
index f9d3a0d..43a2a1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.PendingCheckpointStatsCallback;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import javax.annotation.Nullable;
@@ -82,7 +81,6 @@ public class FailedCheckpointStats extends PendingCheckpointStats {
totalSubtaskCount,
numAcknowledgedSubtasks,
taskStats,
- FAILING_REPORT_CALLBACK,
checkpointedSize,
stateSize,
processedData,
@@ -122,19 +120,4 @@ public class FailedCheckpointStats extends PendingCheckpointStats {
public String getFailureMessage() {
return failureMsg;
}
-
- private static final PendingCheckpointStatsCallback FAILING_REPORT_CALLBACK =
- new PendingCheckpointStatsCallback() {
- @Override
- public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
- throw new UnsupportedOperationException(
- "Failed checkpoint stats can't be completed");
- }
-
- @Override
- public void reportFailedCheckpoint(FailedCheckpointStats failed) {
- throw new UnsupportedOperationException(
- "Failed checkpoint stats can't be failed");
- }
- };
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 8521722..aee95dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -111,6 +111,8 @@ public class PendingCheckpoint implements Checkpoint {
/** The promise to fulfill once the checkpoint has been completed. */
private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
+ @Nullable private final PendingCheckpointStats pendingCheckpointStats;
+
private int numAcknowledgedTasks;
private boolean disposed;
@@ -132,8 +134,8 @@ public class PendingCheckpoint implements Checkpoint {
Collection<String> masterStateIdentifiers,
CheckpointProperties props,
CheckpointStorageLocation targetLocation,
- CompletableFuture<CompletedCheckpoint> onCompletionPromise) {
-
+ CompletableFuture<CompletedCheckpoint> onCompletionPromise,
+ @Nullable PendingCheckpointStats pendingCheckpointStats) {
checkArgument(
checkpointPlan.getTasksToWaitFor().size() > 0,
"Checkpoint needs at least one vertex that commits the checkpoint");
@@ -163,6 +165,7 @@ public class PendingCheckpoint implements Checkpoint {
: new HashSet<>(operatorCoordinatorsToConfirm);
this.acknowledgedTasks = new HashSet<>(checkpointPlan.getTasksToWaitFor().size());
this.onCompletionPromise = checkNotNull(onCompletionPromise);
+ this.pendingCheckpointStats = pendingCheckpointStats;
}
// --------------------------------------------------------------------------------------------
@@ -301,7 +304,7 @@ public class PendingCheckpoint implements Checkpoint {
CheckpointsCleaner checkpointsCleaner,
Runnable postCleanup,
Executor executor,
- @Nullable PendingCheckpointStats statsCallback)
+ CheckpointStatsTracker statsTracker)
throws IOException {
synchronized (lock) {
@@ -334,26 +337,24 @@ public class PendingCheckpoint implements Checkpoint {
operatorStates,
masterStates,
props,
- finalizedLocation);
+ finalizedLocation,
+ toCompletedCheckpointStats(finalizedLocation));
- onCompletionPromise.complete(completed);
-
- if (statsCallback != null) {
+ CompletedCheckpointStats completedCheckpointStats = completed.getStatistic();
+ if (completedCheckpointStats != null) {
LOG.trace(
"Checkpoint {} size: {}Kb, duration: {}ms",
checkpointId,
- statsCallback.getStateSize() == 0
+ completedCheckpointStats.getStateSize() == 0
? 0
- : statsCallback.getStateSize() / 1024,
- statsCallback.getEndToEndDuration());
- // Finalize the statsCallback and give the completed checkpoint a
- // callback for discards.
- CompletedCheckpointStats.DiscardCallback discardCallback =
- statsCallback.reportCompletedCheckpoint(
- finalizedLocation.getExternalPointer());
- completed.setDiscardCallback(discardCallback);
+ : completedCheckpointStats.getStateSize() / 1024,
+ completedCheckpointStats.getEndToEndDuration());
+
+ statsTracker.reportCompletedCheckpoint(completedCheckpointStats);
}
+ onCompletionPromise.complete(completed);
+
// mark this pending checkpoint as disposed, but do NOT drop the state
dispose(false, checkpointsCleaner, postCleanup, executor);
@@ -366,6 +367,15 @@ public class PendingCheckpoint implements Checkpoint {
}
}
+ @Nullable
+ private CompletedCheckpointStats toCompletedCheckpointStats(
+ CompletedCheckpointStorageLocation finalizedLocation) {
+ return pendingCheckpointStats != null
+ ? pendingCheckpointStats.toCompletedCheckpointStats(
+ finalizedLocation.getExternalPointer())
+ : null;
+ }
+
/**
* Acknowledges the task with the given execution attempt id and the given subtask state.
*
@@ -377,8 +387,7 @@ public class PendingCheckpoint implements Checkpoint {
public TaskAcknowledgeResult acknowledgeTask(
ExecutionAttemptID executionAttemptId,
TaskStateSnapshot operatorSubtaskStates,
- CheckpointMetrics metrics,
- @Nullable PendingCheckpointStats statsCallback) {
+ CheckpointMetrics metrics) {
synchronized (lock) {
if (disposed) {
@@ -415,7 +424,7 @@ public class PendingCheckpoint implements Checkpoint {
// publish the checkpoint statistics
// to prevent null-pointers from concurrent modification, copy reference onto stack
- if (statsCallback != null) {
+ if (pendingCheckpointStats != null) {
// Do this in millis because the web frontend works with them
long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
long checkpointStartDelayMillis =
@@ -443,10 +452,12 @@ public class PendingCheckpoint implements Checkpoint {
subtaskStateStats.getStateSize() == 0
? 0
: subtaskStateStats.getStateSize() / 1024,
- subtaskStateStats.getEndToEndDuration(statsCallback.getTriggerTimestamp()),
+ subtaskStateStats.getEndToEndDuration(
+ pendingCheckpointStats.getTriggerTimestamp()),
subtaskStateStats.getSyncCheckpointDuration(),
subtaskStateStats.getAsyncCheckpointDuration());
- statsCallback.reportSubtaskStats(vertex.getJobvertexId(), subtaskStateStats);
+ pendingCheckpointStats.reportSubtaskStats(
+ vertex.getJobvertexId(), subtaskStateStats);
}
return TaskAcknowledgeResult.SUCCESS;
@@ -541,11 +552,11 @@ public class PendingCheckpoint implements Checkpoint {
CheckpointsCleaner checkpointsCleaner,
Runnable postCleanup,
Executor executor,
- PendingCheckpointStats statsCallback) {
+ CheckpointStatsTracker statsTracker) {
try {
failureCause = new CheckpointException(reason, cause);
onCompletionPromise.completeExceptionally(failureCause);
- reportFailedCheckpoint(failureCause, statsCallback);
+ reportFailedCheckpoint(statsTracker, failureCause);
assertAbortSubsumedForced(reason);
} finally {
dispose(true, checkpointsCleaner, postCleanup, executor);
@@ -628,11 +639,11 @@ public class PendingCheckpoint implements Checkpoint {
*
* @param cause The failure cause or <code>null</code>.
*/
- private void reportFailedCheckpoint(Exception cause, PendingCheckpointStats statsCallback) {
+ private void reportFailedCheckpoint(CheckpointStatsTracker statsTracker, Exception cause) {
// to prevent null-pointers from concurrent modification, copy reference onto stack
- if (statsCallback != null) {
- long failureTimestamp = System.currentTimeMillis();
- statsCallback.reportFailedCheckpoint(failureTimestamp, cause);
+ if (pendingCheckpointStats != null) {
+ statsTracker.reportFailedCheckpoint(
+ pendingCheckpointStats.toFailedCheckpoint(System.currentTimeMillis(), cause));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
index a575cf6..a8d74f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
import java.util.Map;
import static java.util.stream.Collectors.toConcurrentMap;
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Statistics for a pending checkpoint that is still in progress.
@@ -45,9 +44,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
private static final long serialVersionUID = -973959257699390327L;
- /** Tracker callback when the pending checkpoint is finalized or aborted. */
- private final transient CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback;
-
/** The current number of acknowledged subtasks. */
private volatile int currentNumAcknowledgedSubtasks;
@@ -70,14 +66,12 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
* @param triggerTimestamp Timestamp when the checkpoint was triggered.
* @param props Checkpoint properties of the checkpoint.
* @param taskStats Task stats for each involved operator.
- * @param trackerCallback Callback for the {@link CheckpointStatsTracker}.
*/
PendingCheckpointStats(
long checkpointId,
long triggerTimestamp,
CheckpointProperties props,
- Map<JobVertexID, Integer> taskStats,
- CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback) {
+ Map<JobVertexID, Integer> taskStats) {
this(
checkpointId,
triggerTimestamp,
@@ -87,8 +81,7 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
.collect(
toConcurrentMap(
Map.Entry::getKey,
- e -> new TaskStateStats(e.getKey(), e.getValue()))),
- trackerCallback);
+ e -> new TaskStateStats(e.getKey(), e.getValue()))));
}
/**
@@ -99,15 +92,13 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
* @param props Checkpoint properties of the checkpoint.
* @param totalSubtaskCount Total number of subtasks for the checkpoint.
* @param taskStats Task stats for each involved operator.
- * @param trackerCallback Callback for the {@link CheckpointStatsTracker}.
*/
PendingCheckpointStats(
long checkpointId,
long triggerTimestamp,
CheckpointProperties props,
int totalSubtaskCount,
- Map<JobVertexID, TaskStateStats> taskStats,
- CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback) {
+ Map<JobVertexID, TaskStateStats> taskStats) {
this(
checkpointId,
triggerTimestamp,
@@ -115,7 +106,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
totalSubtaskCount,
0,
taskStats,
- trackerCallback,
0,
0,
0,
@@ -130,7 +120,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
int totalSubtaskCount,
int acknowledgedSubtaskCount,
Map<JobVertexID, TaskStateStats> taskStats,
- CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback,
long currentCheckpointedSize,
long currentStateSize,
long processedData,
@@ -138,7 +127,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
@Nullable SubtaskStateStats latestAcknowledgedSubtask) {
super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
- this.trackerCallback = checkNotNull(trackerCallback);
this.currentCheckpointedSize = currentCheckpointedSize;
this.currentStateSize = currentStateSize;
this.currentProcessedData = processedData;
@@ -220,31 +208,20 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
}
}
- /**
- * Reports a successfully completed pending checkpoint.
- *
- * @param externalPointer Optional external storage path if checkpoint was externalized.
- * @return Callback for the {@link CompletedCheckpoint} instance to notify about disposal.
- */
- CompletedCheckpointStats.DiscardCallback reportCompletedCheckpoint(String externalPointer) {
- CompletedCheckpointStats completed =
- new CompletedCheckpointStats(
- checkpointId,
- triggerTimestamp,
- props,
- numberOfSubtasks,
- new HashMap<>(taskStats),
- currentNumAcknowledgedSubtasks,
- currentCheckpointedSize,
- currentStateSize,
- currentProcessedData,
- currentPersistedData,
- latestAcknowledgedSubtask,
- externalPointer);
-
- trackerCallback.reportCompletedCheckpoint(completed);
-
- return completed.getDiscardCallback();
+ CompletedCheckpointStats toCompletedCheckpointStats(String externalPointer) {
+ return new CompletedCheckpointStats(
+ checkpointId,
+ triggerTimestamp,
+ props,
+ numberOfSubtasks,
+ new HashMap<>(taskStats),
+ currentNumAcknowledgedSubtasks,
+ currentCheckpointedSize,
+ currentStateSize,
+ currentProcessedData,
+ currentPersistedData,
+ latestAcknowledgedSubtask,
+ externalPointer);
}
/**
@@ -253,24 +230,21 @@ public class PendingCheckpointStats extends AbstractCheckpointStats {
* @param failureTimestamp Timestamp of the failure.
* @param cause Optional cause of the failure.
*/
- void reportFailedCheckpoint(long failureTimestamp, @Nullable Throwable cause) {
- FailedCheckpointStats failed =
- new FailedCheckpointStats(
- checkpointId,
- triggerTimestamp,
- props,
- numberOfSubtasks,
- new HashMap<>(taskStats),
- currentNumAcknowledgedSubtasks,
- currentCheckpointedSize,
- currentStateSize,
- currentProcessedData,
- currentPersistedData,
- failureTimestamp,
- latestAcknowledgedSubtask,
- cause);
-
- trackerCallback.reportFailedCheckpoint(failed);
+ FailedCheckpointStats toFailedCheckpoint(long failureTimestamp, @Nullable Throwable cause) {
+ return new FailedCheckpointStats(
+ checkpointId,
+ triggerTimestamp,
+ props,
+ numberOfSubtasks,
+ new HashMap<>(taskStats),
+ currentNumAcknowledgedSubtasks,
+ currentCheckpointedSize,
+ currentStateSize,
+ currentProcessedData,
+ currentPersistedData,
+ failureTimestamp,
+ latestAcknowledgedSubtask,
+ cause);
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 3dfebab..ce90895 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -296,7 +296,8 @@ public class CheckpointCoordinatorMasterHooksTest {
masterHookStates,
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(new JobVertexID())
@@ -356,7 +357,8 @@ public class CheckpointCoordinatorMasterHooksTest {
masterHookStates,
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 417473e..3b83973 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -773,7 +773,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
Collections.<MasterState>emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
// set up the coordinator and validate the initial state
SharedStateRegistry sharedStateRegistry =
@@ -1089,7 +1090,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
Collections.emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
completedCheckpoint, new CheckpointsCleaner(), () -> {});
@@ -1131,7 +1133,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
Collections.emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
completedCheckpoint, new CheckpointsCleaner(), () -> {});
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 9297c54..2cfa183 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2821,7 +2821,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
Collections.<MasterState>emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- new TestCompletedCheckpointStorageLocation()),
+ new TestCompletedCheckpointStorageLocation(),
+ null),
new CheckpointsCleaner(),
() -> {});
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 44696cb..6239892 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -239,7 +239,8 @@ public class CheckpointStateRestoreTest {
Collections.<MasterState>emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
coord.getCheckpointStore()
.addCheckpointAndSubsumeOldestOne(checkpoint, new CheckpointsCleaner(), () -> {});
@@ -269,7 +270,8 @@ public class CheckpointStateRestoreTest {
Collections.<MasterState>emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
coord.getCheckpointStore()
.addCheckpointAndSubsumeOldestOne(checkpoint, new CheckpointsCleaner(), () -> {});
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index c11e572..8e7b0bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -69,7 +69,7 @@ public class CheckpointStatsTrackerTest {
pending.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
pending.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
- pending.reportCompletedCheckpoint(null);
+ tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
// History should be empty
@@ -118,7 +118,7 @@ public class CheckpointStatsTrackerTest {
completed1.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
completed1.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
- completed1.reportCompletedCheckpoint(null);
+ tracker.reportCompletedCheckpoint(completed1.toCompletedCheckpointStats(null));
// Failed checkpoint
PendingCheckpointStats failed =
@@ -129,7 +129,7 @@ public class CheckpointStatsTrackerTest {
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
vertexToDop);
- failed.reportFailedCheckpoint(12, null);
+ tracker.reportFailedCheckpoint(failed.toFailedCheckpoint(12, null));
// Completed savepoint
PendingCheckpointStats savepoint =
@@ -143,7 +143,7 @@ public class CheckpointStatsTrackerTest {
savepoint.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
savepoint.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
- savepoint.reportCompletedCheckpoint(null);
+ tracker.reportCompletedCheckpoint(savepoint.toCompletedCheckpointStats(null));
// In Progress
PendingCheckpointStats inProgress =
@@ -242,7 +242,7 @@ public class CheckpointStatsTrackerTest {
assertEquals(snapshot2, tracker.createSnapshot());
// Complete checkpoint => new snapshot
- pending.reportCompletedCheckpoint(null);
+ tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
CheckpointStatsSnapshot snapshot3 = tracker.createSnapshot();
assertNotEquals(snapshot2, snapshot3);
@@ -422,7 +422,7 @@ public class CheckpointStatsTrackerTest {
assertTrue(pending.reportSubtaskStats(jobVertexID, subtaskStats));
- pending.reportCompletedCheckpoint(externalPath);
+ stats.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(externalPath));
// Verify completed checkpoint updated
assertEquals(Long.valueOf(1), numCheckpoints.getValue());
@@ -446,7 +446,7 @@ public class CheckpointStatsTrackerTest {
singletonMap(jobVertexID, 1));
long failureTimestamp = 1230123L;
- nextPending.reportFailedCheckpoint(failureTimestamp, null);
+ stats.reportFailedCheckpoint(nextPending.toFailedCheckpoint(failureTimestamp, null));
// Verify updated
assertEquals(Long.valueOf(2), numCheckpoints.getValue());
@@ -482,7 +482,7 @@ public class CheckpointStatsTrackerTest {
singletonMap(jobVertexID, 1));
thirdPending.reportSubtaskStats(jobVertexID, subtaskStats);
- thirdPending.reportCompletedCheckpoint(null);
+ stats.reportCompletedCheckpoint(thirdPending.toCompletedCheckpointStats(null));
// Verify external path is "n/a", because internal checkpoint won't generate external path.
assertEquals("n/a", latestCompletedExternalPath.getValue());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 122773a..62e5479 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -307,7 +307,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
operatorGroupState,
null,
props,
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index c7cad82..3a29452 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -63,7 +63,8 @@ public class CompletedCheckpointTest {
Collections.emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
CompletedCheckpoint checkpoint2 =
new CompletedCheckpoint(
@@ -75,7 +76,8 @@ public class CompletedCheckpointTest {
Collections.emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
List<CompletedCheckpoint> checkpoints1 = new ArrayList<>();
checkpoints1.add(checkpoint1);
@@ -103,7 +105,8 @@ public class CompletedCheckpointTest {
Collections.emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
CompletedCheckpoint checkpoint2 =
new CompletedCheckpoint(
@@ -115,7 +118,8 @@ public class CompletedCheckpointTest {
Collections.emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
List<CompletedCheckpoint> checkpoints1 = new ArrayList<>();
checkpoints1.add(checkpoint1);
@@ -145,7 +149,8 @@ public class CompletedCheckpointTest {
Collections.emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
CompletedCheckpoint checkpoint2 =
new CompletedCheckpoint(
@@ -157,7 +162,8 @@ public class CompletedCheckpointTest {
Collections.emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
List<CompletedCheckpoint> checkpoints1 = new ArrayList<>();
checkpoints1.add(checkpoint1);
@@ -184,7 +190,8 @@ public class CompletedCheckpointTest {
Collections.emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
CompletedCheckpoint checkpoint2 =
new CompletedCheckpoint(
@@ -196,7 +203,8 @@ public class CompletedCheckpointTest {
Collections.emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
List<CompletedCheckpoint> checkpoints1 = new ArrayList<>();
checkpoints1.add(checkpoint1);
@@ -223,7 +231,8 @@ public class CompletedCheckpointTest {
Collections.emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
@@ -254,7 +263,8 @@ public class CompletedCheckpointTest {
operatorStates,
Collections.emptyList(),
props,
- location);
+ location,
+ null);
SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl();
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
@@ -306,7 +316,8 @@ public class CompletedCheckpointTest {
new HashMap<>(operatorStates),
Collections.emptyList(),
retainProps,
- retainedLocation);
+ retainedLocation,
+ null);
checkpoint.discardOnShutdown(status);
@@ -333,7 +344,8 @@ public class CompletedCheckpointTest {
new HashMap<>(operatorStates),
Collections.emptyList(),
discardProps,
- discardLocation);
+ discardLocation,
+ null);
checkpoint.discardOnShutdown(status);
@@ -346,6 +358,23 @@ public class CompletedCheckpointTest {
/** Tests that the stats callbacks happen if the callback is registered. */
@Test
public void testCompletedCheckpointStatsCallbacks() throws Exception {
+ Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+ JobVertexID jobVertexId = new JobVertexID();
+ taskStats.put(jobVertexId, new TaskStateStats(jobVertexId, 1));
+ CompletedCheckpointStats checkpointStats =
+ new CompletedCheckpointStats(
+ 1,
+ 0,
+ CheckpointProperties.forCheckpoint(
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ 1,
+ taskStats,
+ 1,
+ 1,
+ 1,
+ 1,
+ mock(SubtaskStateStats.class),
+ null);
CompletedCheckpoint completed =
new CompletedCheckpoint(
new JobID(),
@@ -356,14 +385,11 @@ public class CompletedCheckpointTest {
Collections.emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- new TestCompletedCheckpointStorageLocation());
-
- CompletedCheckpointStats.DiscardCallback callback =
- mock(CompletedCheckpointStats.DiscardCallback.class);
- completed.setDiscardCallback(callback);
+ new TestCompletedCheckpointStorageLocation(),
+ checkpointStats);
completed.discardOnShutdown(JobStatus.FINISHED);
- verify(callback, times(1)).notifyDiscardedCheckpoint();
+ assertTrue(checkpointStats.isDiscarded());
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index ed36db9..064e26a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -418,7 +418,8 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
Collections.emptyMap(),
Collections.emptyList(),
props,
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
}
private List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> createStateHandles(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java
index d7abd6e..82db6ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java
@@ -58,7 +58,8 @@ public class DefaultCompletedCheckpointStoreUtilsTest extends TestLogger {
new HashMap<>(),
Collections.emptyList(),
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
}
private static class FailingRetrievableStateHandle<T extends Serializable>
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
index 37680ba..e46f608 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
@@ -52,17 +52,9 @@ public class PendingCheckpointStatsTest {
taskStats.put(task1.getJobVertexId(), task1);
taskStats.put(task2.getJobVertexId(), task2);
- CheckpointStatsTracker.PendingCheckpointStatsCallback callback =
- mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class);
-
PendingCheckpointStats pending =
new PendingCheckpointStats(
- checkpointId,
- triggerTimestamp,
- props,
- totalSubtaskCount,
- taskStats,
- callback);
+ checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
// Check initial state
assertEquals(checkpointId, pending.getCheckpointId());
@@ -129,8 +121,7 @@ public class PendingCheckpointStatsTest {
taskStats.put(task1.getJobVertexId(), task1);
taskStats.put(task2.getJobVertexId(), task2);
- CheckpointStatsTracker.PendingCheckpointStatsCallback callback =
- mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class);
+ CheckpointStatsTracker callback = mock(CheckpointStatsTracker.class);
PendingCheckpointStats pending =
new PendingCheckpointStats(
@@ -139,8 +130,7 @@ public class PendingCheckpointStatsTest {
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks(),
- taskStats,
- callback);
+ taskStats);
// Report subtasks
for (int i = 0; i < task1.getNumberOfSubtasks(); i++) {
@@ -154,8 +144,7 @@ public class PendingCheckpointStatsTest {
// Report completed
String externalPath = "asdjkasdjkasd";
- CompletedCheckpointStats.DiscardCallback discardCallback =
- pending.reportCompletedCheckpoint(externalPath);
+ callback.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(externalPath));
ArgumentCaptor<CompletedCheckpointStats> args =
ArgumentCaptor.forClass(CompletedCheckpointStats.class);
@@ -166,7 +155,7 @@ public class PendingCheckpointStatsTest {
assertNotNull(completed);
assertEquals(CheckpointStatsStatus.COMPLETED, completed.getStatus());
assertFalse(completed.isDiscarded());
- discardCallback.notifyDiscardedCheckpoint();
+ completed.discard();
assertTrue(completed.isDiscarded());
assertEquals(externalPath, completed.getExternalPath());
@@ -194,8 +183,7 @@ public class PendingCheckpointStatsTest {
taskStats.put(task1.getJobVertexId(), task1);
taskStats.put(task2.getJobVertexId(), task2);
- CheckpointStatsTracker.PendingCheckpointStatsCallback callback =
- mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class);
+ CheckpointStatsTracker callback = mock(CheckpointStatsTracker.class);
long triggerTimestamp = 123123;
PendingCheckpointStats pending =
@@ -205,8 +193,7 @@ public class PendingCheckpointStatsTest {
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks(),
- taskStats,
- callback);
+ taskStats);
// Report subtasks
for (int i = 0; i < task1.getNumberOfSubtasks(); i++) {
@@ -220,7 +207,7 @@ public class PendingCheckpointStatsTest {
// Report failed
Exception cause = new Exception("test exception");
long failureTimestamp = 112211137;
- pending.reportFailedCheckpoint(failureTimestamp, cause);
+ callback.reportFailedCheckpoint(pending.toFailedCheckpoint(failureTimestamp, cause));
ArgumentCaptor<FailedCheckpointStats> args =
ArgumentCaptor.forClass(FailedCheckpointStats.class);
@@ -263,8 +250,7 @@ public class PendingCheckpointStatsTest {
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
1337,
- taskStats,
- mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class));
+ taskStats);
PendingCheckpointStats copy = CommonTestUtils.createCopySerializable(pending);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 05111d6..294c6fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo;
@@ -74,9 +73,7 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -218,7 +215,7 @@ public class PendingCheckpointTest {
future = pending.getCompletionFuture();
assertFalse(future.isDone());
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
+ pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
assertTrue(pending.areTasksFullyAcknowledged());
pending.finalizeCheckpoint(
new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
@@ -292,75 +289,6 @@ public class PendingCheckpointTest {
verify(state, times(1)).discardState();
}
- /** Tests that the stats callbacks happen if the callback is registered. */
- @Test
- public void testPendingCheckpointStatsCallbacks() throws Exception {
- {
- // Complete successfully
- PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
- PendingCheckpoint pending =
- createPendingCheckpoint(
- CheckpointProperties.forCheckpoint(
- CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), callback);
- verify(callback, times(1))
- .reportSubtaskStats(nullable(JobVertexID.class), any(SubtaskStateStats.class));
-
- pending.finalizeCheckpoint(
- new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), callback);
- verify(callback, times(1)).reportCompletedCheckpoint(any(String.class));
- }
-
- {
- // Fail subsumed
- PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
- PendingCheckpoint pending =
- createPendingCheckpoint(
- CheckpointProperties.forCheckpoint(
- CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
- abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
- verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
- }
-
- {
- // Fail subsumed
- PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
- PendingCheckpoint pending =
- createPendingCheckpoint(
- CheckpointProperties.forCheckpoint(
- CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
- abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED, callback);
- verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
- }
-
- {
- // Fail subsumed
- PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
- PendingCheckpoint pending =
- createPendingCheckpoint(
- CheckpointProperties.forCheckpoint(
- CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
- abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
- verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
- }
-
- {
- // Fail subsumed
- PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
- PendingCheckpoint pending =
- createPendingCheckpoint(
- CheckpointProperties.forCheckpoint(
- CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
- abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED, callback);
- verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
- }
- }
-
/**
* FLINK-5985.
*
@@ -373,7 +301,7 @@ public class PendingCheckpointTest {
createPendingCheckpoint(
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
- pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class), null);
+ pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class));
final OperatorState expectedState =
new OperatorState(OPERATOR_ID, PARALLELISM, MAX_PARALLELISM);
Assert.assertEquals(
@@ -394,7 +322,7 @@ public class PendingCheckpointTest {
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.acknowledgeTask(
- ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class), null);
+ ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class));
Assert.assertFalse(pending.getOperatorStates().isEmpty());
}
@@ -441,7 +369,7 @@ public class PendingCheckpointTest {
assertTrue(pending.areMasterStatesFullyAcknowledged());
assertFalse(pending.areTasksFullyAcknowledged());
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
+ pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
assertTrue(pending.areTasksFullyAcknowledged());
final List<MasterState> resultMasterStates = pending.getMasterStates();
@@ -494,7 +422,7 @@ public class PendingCheckpointTest {
assertTrue(pending.areMasterStatesFullyAcknowledged());
assertFalse(pending.areTasksFullyAcknowledged());
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
+ pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
assertTrue(pending.areTasksFullyAcknowledged());
final List<MasterState> resultMasterStates = pending.getMasterStates();
@@ -580,8 +508,7 @@ public class PendingCheckpointTest {
checkpoint.acknowledgeTask(
ACK_TASKS.get(0).getAttemptId(),
TaskStateSnapshot.FINISHED_ON_RESTORE,
- new CheckpointMetrics(),
- null);
+ new CheckpointMetrics());
assertThat(
recordCheckpointPlan.getReportedFinishedOnRestoreTasks(),
contains(ACK_TASKS.get(0).getVertex()));
@@ -595,8 +522,7 @@ public class PendingCheckpointTest {
checkpoint.acknowledgeTask(
ACK_TASKS.get(0).getAttemptId(),
new TaskStateSnapshot(10, true),
- new CheckpointMetrics(),
- null);
+ new CheckpointMetrics());
assertThat(
recordCheckpointPlan.getReportedOperatorsFinishedTasks(),
contains(ACK_TASKS.get(0).getVertex()));
@@ -648,7 +574,7 @@ public class PendingCheckpointTest {
Collections.emptyList(),
Executors.directExecutor());
- checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
+ checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
return checkpoint;
}
@@ -718,7 +644,8 @@ public class PendingCheckpointTest {
masterStateIdentifiers,
props,
location,
- new CompletableFuture<>());
+ new CompletableFuture<>(),
+ null);
}
@SuppressWarnings("unchecked")
@@ -741,12 +668,7 @@ public class PendingCheckpointTest {
CheckpointFailureReason reason,
PendingCheckpointStats statsCallback) {
checkpoint.abort(
- reason,
- null,
- new CheckpointsCleaner(),
- () -> {},
- Executors.directExecutor(),
- statsCallback);
+ reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
}
private static final class QueueExecutor implements Executor {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index d36b321..9a584c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -107,7 +107,8 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
Collections.emptyMap(),
Collections.emptyList(),
CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
- new TestCompletedCheckpointStorageLocation()) {
+ new TestCompletedCheckpointStorageLocation(),
+ null) {
@Override
public boolean discardOnSubsume() {
discardAttempted.countDown();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index ee4f46c..804d285 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -254,7 +254,8 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
Collections.emptyMap(),
Collections.emptyList(),
CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
// shouldn't fail despite the exception
store.addCheckpointAndSubsumeOldestOne(
checkpointToAdd,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 50bac5c..50a9700 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -880,7 +880,8 @@ public class JobMasterTest extends TestLogger {
null,
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- new DummyCheckpointStorageLocation());
+ new DummyCheckpointStorageLocation(),
+ null);
final StandaloneCompletedCheckpointStore completedCheckpointStore =
new StandaloneCompletedCheckpointStore(1);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index 641a918..dfc3b40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -150,7 +150,8 @@ public class SchedulerUtilsTest extends TestLogger {
singletonMap(operatorID, operatorState),
emptyList(),
CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
- new TestCompletedCheckpointStorageLocation());
+ new TestCompletedCheckpointStorageLocation(),
+ null);
}
private IncrementalRemoteKeyedStateHandle buildIncrementalHandle(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
index f6db13c..cab4abe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
@@ -223,6 +223,7 @@ public class StopWithSavepointTerminationHandlerImplTest extends TestLogger {
new HashMap<>(),
null,
CheckpointProperties.forSavepoint(true, SavepointFormatType.CANONICAL),
- new TestCompletedCheckpointStorageLocation(streamStateHandle, "savepoint-path"));
+ new TestCompletedCheckpointStorageLocation(streamStateHandle, "savepoint-path"),
+ null);
}
}