You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/09/09 01:17:10 UTC
[flink] 02/06: [hotfix] Throws the causing exception if a future is
completed exceptionally unexpectedly.i
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 16fc743f580099737c3ab422d28856e2a38c313a
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Sep 7 23:14:07 2020 +0800
[hotfix] Throws the causing exception if a future is completed exceptionally unexpectedly.i
---
.../flink/runtime/concurrent/FutureUtils.java | 12 +++++++
.../checkpoint/CheckpointCoordinatorTest.java | 37 +++++++++++-----------
2 files changed, 31 insertions(+), 18 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 05be10d..0dc79c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -1114,6 +1114,18 @@ public class FutureUtils {
executor);
}
+ /**
+ * Throws the causing exception if the given future is completed exceptionally, otherwise do nothing.
+ *
+ * @param future the future to check.
+ * @throws Exception when the future is completed exceptionally.
+ */
+ public static void throwIfCompletedExceptionally(CompletableFuture<?> future) throws Exception {
+ if (future.isCompletedExceptionally()) {
+ future.get();
+ }
+ }
+
private static <T> BiConsumer<T, Throwable> forwardTo(CompletableFuture<T> target) {
return (value, throwable) -> {
if (throwable != null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 60cd9ce..3207164 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -265,7 +266,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// trigger the checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkPointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkPointFuture.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkPointFuture);
long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
@@ -346,7 +347,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
// validate that we have a pending checkpoint
assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
@@ -434,12 +435,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture1.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
// trigger second checkpoint, should also succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture2.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
// validate that we have a pending checkpoint
assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
@@ -548,7 +549,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
// validate that we have a pending checkpoint
assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
@@ -707,7 +708,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture1.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -726,7 +727,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture2.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -833,7 +834,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture1.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -868,7 +869,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
final CompletableFuture<CompletedCheckpoint> checkpointFuture2 =
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture2.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -990,7 +991,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// trigger a checkpoint, partially acknowledged
final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
@@ -1053,7 +1054,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
@@ -1115,7 +1116,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
@@ -1374,11 +1375,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
- assertFalse(checkpointFuture1.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture2.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
long checkpointId2 = counter.getLast();
assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
@@ -1394,13 +1395,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
CompletableFuture<CompletedCheckpoint> checkpointFuture3 = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture3.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture3);
assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
CompletableFuture<CompletedCheckpoint> savepointFuture2 = checkpointCoordinator.triggerSavepoint(savepointDir);
manuallyTriggeredScheduledExecutor.triggerAll();
long savepointId2 = counter.getLast();
- assertFalse(savepointFuture2.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(savepointFuture2);
assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
@@ -1736,7 +1737,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
CompletableFuture<CompletedCheckpoint> checkpointFuture =
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
for (PendingCheckpoint checkpoint : checkpointCoordinator.getPendingCheckpoints().values()) {
CheckpointProperties props = checkpoint.getProps();
@@ -1963,7 +1964,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
CompletableFuture<CompletedCheckpoint> checkpointFuture =
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
- assertFalse(checkpointFuture.isCompletedExceptionally());
+ FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
verify(tracker, times(1))
.reportPendingCheckpoint(eq(1L), any(Long.class), eq(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)));