You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2021/02/08 09:23:11 UTC
[flink] 02/02: [FLINK-21033][checkpointing] Remove
PendingCheckpoint.statsCallback
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 35757ed3c11f32769d6d99882596f78f8b8e12b5
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Sat Feb 6 14:35:58 2021 +0100
[FLINK-21033][checkpointing] Remove PendingCheckpoint.statsCallback
---
.../runtime/checkpoint/CheckpointCoordinator.java | 37 +++++++++++------
.../runtime/checkpoint/CheckpointStatsTracker.java | 10 +++++
.../runtime/checkpoint/PendingCheckpoint.java | 30 +++++---------
.../runtime/checkpoint/PendingCheckpointTest.java | 48 +++++++++++++---------
4 files changed, 72 insertions(+), 53 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index d4fa785..23ce6c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -717,7 +717,7 @@ public class CheckpointCoordinator {
checkpointStorageLocation,
onCompletionPromise);
- reportToStatsTracker(checkpoint, checkpointPlan.getTasksToWaitFor());
+ trackPendingCheckpointStats(checkpoint);
synchronized (lock) {
pendingCheckpoints.put(checkpointID, checkpoint);
@@ -1052,7 +1052,8 @@ public class CheckpointCoordinator {
switch (checkpoint.acknowledgeTask(
message.getTaskExecutionId(),
message.getSubtaskState(),
- message.getCheckpointMetrics())) {
+ message.getCheckpointMetrics(),
+ getStatsCallback(checkpoint))) {
case SUCCESS:
LOG.debug(
"Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
@@ -1173,7 +1174,11 @@ public class CheckpointCoordinator {
try {
completedCheckpoint =
pendingCheckpoint.finalizeCheckpoint(
- checkpointsCleaner, this::scheduleTriggerRequest, executor);
+ checkpointsCleaner,
+ this::scheduleTriggerRequest,
+ executor,
+ getStatsCallback(pendingCheckpoint));
+
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
} catch (Exception e1) {
// abort the current pending checkpoint if we fails to finalize the pending
@@ -1908,7 +1913,8 @@ public class CheckpointCoordinator {
exception.getCause(),
checkpointsCleaner,
this::scheduleTriggerRequest,
- executor);
+ executor,
+ getStatsCallback(pendingCheckpoint));
if (pendingCheckpoint.getProps().isSavepoint()
&& pendingCheckpoint.getProps().isSynchronous()) {
@@ -2049,24 +2055,29 @@ public class CheckpointCoordinator {
SKIP;
}
- private void reportToStatsTracker(
- PendingCheckpoint checkpoint, Map<ExecutionAttemptID, ExecutionVertex> tasks) {
+ private void trackPendingCheckpointStats(PendingCheckpoint checkpoint) {
if (statsTracker == null) {
return;
}
Map<JobVertexID, Integer> vertices =
- tasks.values().stream()
+ checkpoint.getCheckpointPlan().getTasksToWaitFor().values().stream()
.map(ExecutionVertex::getJobVertex)
.distinct()
.collect(
toMap(
ExecutionJobVertex::getJobVertexId,
ExecutionJobVertex::getParallelism));
- checkpoint.setStatsCallback(
- statsTracker.reportPendingCheckpoint(
- checkpoint.getCheckpointID(),
- checkpoint.getCheckpointTimestamp(),
- checkpoint.getProps(),
- vertices));
+ statsTracker.reportPendingCheckpoint(
+ checkpoint.getCheckpointID(),
+ checkpoint.getCheckpointTimestamp(),
+ checkpoint.getProps(),
+ vertices);
+ }
+
+ @Nullable
+ private PendingCheckpointStats getStatsCallback(PendingCheckpoint pendingCheckpoint) {
+ return statsTracker == null
+ ? null
+ : statsTracker.getPendingCheckpointStats(pendingCheckpoint.getCheckpointID());
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index e8883dd..8c4ac97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -255,6 +255,16 @@ public class CheckpointStatsTracker {
}
}
+ public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) {
+ statsReadWriteLock.lock();
+ try {
+ AbstractCheckpointStats stats = history.getCheckpointById(checkpointId);
+ return stats instanceof PendingCheckpointStats ? (PendingCheckpointStats) stats : null;
+ } finally {
+ statsReadWriteLock.unlock();
+ }
+ }
+
public void reportIncompleteStats(
long checkpointId, ExecutionVertex vertex, CheckpointMetrics metrics) {
statsReadWriteLock.lock();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 6123e19..e44d2fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -116,9 +116,6 @@ public class PendingCheckpoint implements Checkpoint {
private boolean discarded;
- /** Optional stats tracker callback. */
- @Nullable private PendingCheckpointStats statsCallback;
-
private volatile ScheduledFuture<?> cancellerHandle;
private CheckpointException failureCause;
@@ -257,15 +254,6 @@ public class PendingCheckpoint implements Checkpoint {
}
/**
- * Sets the callback for tracking this pending checkpoint.
- *
- * @param trackerCallback Callback for collecting subtask stats.
- */
- void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback) {
- this.statsCallback = trackerCallback;
- }
-
- /**
* Sets the handle for the canceller to this pending checkpoint. This method fails with an
* exception if a handle has already been set.
*
@@ -304,7 +292,10 @@ public class PendingCheckpoint implements Checkpoint {
}
public CompletedCheckpoint finalizeCheckpoint(
- CheckpointsCleaner checkpointsCleaner, Runnable postCleanup, Executor executor)
+ CheckpointsCleaner checkpointsCleaner,
+ Runnable postCleanup,
+ Executor executor,
+ @Nullable PendingCheckpointStats statsCallback)
throws IOException {
synchronized (lock) {
@@ -340,7 +331,6 @@ public class PendingCheckpoint implements Checkpoint {
onCompletionPromise.complete(completed);
// to prevent null-pointers from concurrent modification, copy reference onto stack
- PendingCheckpointStats statsCallback = this.statsCallback;
if (statsCallback != null) {
// Finalize the statsCallback and give the completed checkpoint a
// callback for discards.
@@ -373,7 +363,8 @@ public class PendingCheckpoint implements Checkpoint {
public TaskAcknowledgeResult acknowledgeTask(
ExecutionAttemptID executionAttemptId,
TaskStateSnapshot operatorSubtaskStates,
- CheckpointMetrics metrics) {
+ CheckpointMetrics metrics,
+ @Nullable PendingCheckpointStats statsCallback) {
synchronized (lock) {
if (disposed) {
@@ -428,7 +419,6 @@ public class PendingCheckpoint implements Checkpoint {
// publish the checkpoint statistics
// to prevent null-pointers from concurrent modification, copy reference onto stack
- final PendingCheckpointStats statsCallback = this.statsCallback;
if (statsCallback != null) {
// Do this in millis because the web frontend works with them
long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
@@ -518,11 +508,12 @@ public class PendingCheckpoint implements Checkpoint {
@Nullable Throwable cause,
CheckpointsCleaner checkpointsCleaner,
Runnable postCleanup,
- Executor executor) {
+ Executor executor,
+ PendingCheckpointStats statsCallback) {
try {
failureCause = new CheckpointException(reason, cause);
onCompletionPromise.completeExceptionally(failureCause);
- reportFailedCheckpoint(failureCause);
+ reportFailedCheckpoint(failureCause, statsCallback);
assertAbortSubsumedForced(reason);
} finally {
dispose(true, checkpointsCleaner, postCleanup, executor);
@@ -605,9 +596,8 @@ public class PendingCheckpoint implements Checkpoint {
*
* @param cause The failure cause or <code>null</code>.
*/
- private void reportFailedCheckpoint(Exception cause) {
+ private void reportFailedCheckpoint(Exception cause, PendingCheckpointStats statsCallback) {
// to prevent null-pointers from concurrent modification, copy reference onto stack
- final PendingCheckpointStats statsCallback = this.statsCallback;
if (statsCallback != null) {
long failureTimestamp = System.currentTimeMillis();
statsCallback.reportFailedCheckpoint(failureTimestamp, cause);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 0ebad88..52be1bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -182,9 +182,10 @@ public class PendingCheckpointTest {
future = pending.getCompletionFuture();
assertFalse(future.isDone());
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+ pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
assertTrue(pending.areTasksFullyAcknowledged());
- pending.finalizeCheckpoint(new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
+ pending.finalizeCheckpoint(
+ new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
assertTrue(future.isDone());
// Finalize (missing ACKs)
@@ -194,7 +195,7 @@ public class PendingCheckpointTest {
assertFalse(future.isDone());
try {
pending.finalizeCheckpoint(
- new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
+ new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
fail("Did not throw expected Exception");
} catch (IllegalStateException ignored) {
// Expected
@@ -265,14 +266,13 @@ public class PendingCheckpointTest {
createPendingCheckpoint(
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
- pending.setStatsCallback(callback);
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+ pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), callback);
verify(callback, times(1))
.reportSubtaskStats(nullable(JobVertexID.class), any(SubtaskStateStats.class));
pending.finalizeCheckpoint(
- new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
+ new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), callback);
verify(callback, times(1)).reportCompletedCheckpoint(any(String.class));
}
@@ -283,9 +283,8 @@ public class PendingCheckpointTest {
createPendingCheckpoint(
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
- pending.setStatsCallback(callback);
- abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
+ abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
}
@@ -296,9 +295,8 @@ public class PendingCheckpointTest {
createPendingCheckpoint(
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
- pending.setStatsCallback(callback);
- abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
+ abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED, callback);
verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
}
@@ -309,9 +307,8 @@ public class PendingCheckpointTest {
createPendingCheckpoint(
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
- pending.setStatsCallback(callback);
- abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
+ abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback);
verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
}
@@ -322,9 +319,8 @@ public class PendingCheckpointTest {
createPendingCheckpoint(
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
- pending.setStatsCallback(callback);
- abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED);
+ abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED, callback);
verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
}
}
@@ -341,7 +337,7 @@ public class PendingCheckpointTest {
createPendingCheckpoint(
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
- pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class));
+ pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class), null);
Assert.assertTrue(pending.getOperatorStates().isEmpty());
}
@@ -359,7 +355,7 @@ public class PendingCheckpointTest {
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.acknowledgeTask(
- ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class));
+ ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class), null);
Assert.assertFalse(pending.getOperatorStates().isEmpty());
}
@@ -406,7 +402,7 @@ public class PendingCheckpointTest {
assertTrue(pending.areMasterStatesFullyAcknowledged());
assertFalse(pending.areTasksFullyAcknowledged());
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+ pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
assertTrue(pending.areTasksFullyAcknowledged());
final List<MasterState> resultMasterStates = pending.getMasterStates();
@@ -459,7 +455,7 @@ public class PendingCheckpointTest {
assertTrue(pending.areMasterStatesFullyAcknowledged());
assertFalse(pending.areTasksFullyAcknowledged());
- pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+ pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
assertTrue(pending.areTasksFullyAcknowledged());
final List<MasterState> resultMasterStates = pending.getMasterStates();
@@ -570,7 +566,7 @@ public class PendingCheckpointTest {
Collections.emptyList(),
Executors.directExecutor());
- checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics());
+ checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null);
return checkpoint;
}
@@ -634,8 +630,20 @@ public class PendingCheckpointTest {
}
private void abort(PendingCheckpoint checkpoint, CheckpointFailureReason reason) {
+ abort(checkpoint, reason, null);
+ }
+
+ private void abort(
+ PendingCheckpoint checkpoint,
+ CheckpointFailureReason reason,
+ PendingCheckpointStats statsCallback) {
checkpoint.abort(
- reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor());
+ reason,
+ null,
+ new CheckpointsCleaner(),
+ () -> {},
+ Executors.directExecutor(),
+ statsCallback);
}
private static final class QueueExecutor implements Executor {