You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/30 21:15:06 UTC
[flink] 03/11: [hotfix][checkpointing] Improve exception in case
Coordinator State ack fails
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b144f13a2cc6dcebe38425ebac81a56cbb6e6f7d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 29 19:43:49 2020 +0200
[hotfix][checkpointing] Improve exception in case Coordinator State ack fails
---
.../runtime/checkpoint/OperatorCoordinatorCheckpoints.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
index 2423bcb..39ac10f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -105,8 +105,14 @@ final class OperatorCoordinatorCheckpoints {
checkpoint.acknowledgeCoordinatorState(snapshot.coordinator, snapshot.state);
if (result != PendingCheckpoint.TaskAcknowledgeResult.SUCCESS) {
- throw new CheckpointException("Coordinator state not acknowledged successfully: " + result,
- CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
+ final String errorMessage = "Coordinator state not acknowledged successfully: " + result;
+ final Throwable error = checkpoint.isDiscarded() ? checkpoint.getFailureCause() : null;
+
+ if (error != null) {
+ throw new CheckpointException(errorMessage, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, error);
+ } else {
+ throw new CheckpointException(errorMessage, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE);
+ }
}
}
}