You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/09/09 01:17:10 UTC

[flink] 02/06: [hotfix] Throws the causing exception if a future is completed exceptionally unexpectedly.i

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 16fc743f580099737c3ab422d28856e2a38c313a
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Sep 7 23:14:07 2020 +0800

    [hotfix] Throws the causing exception if a future is completed exceptionally unexpectedly.i
---
 .../flink/runtime/concurrent/FutureUtils.java      | 12 +++++++
 .../checkpoint/CheckpointCoordinatorTest.java      | 37 +++++++++++-----------
 2 files changed, 31 insertions(+), 18 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 05be10d..0dc79c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -1114,6 +1114,18 @@ public class FutureUtils {
 			executor);
 	}
 
+	/**
+	 * Throws the causing exception if the given future is completed exceptionally, otherwise do nothing.
+	 *
+	 * @param future the future to check.
+	 * @throws Exception when the future is completed exceptionally.
+	 */
+	public static void throwIfCompletedExceptionally(CompletableFuture<?> future) throws Exception {
+		if (future.isCompletedExceptionally()) {
+			future.get();
+		}
+	}
+
 	private static <T> BiConsumer<T, Throwable> forwardTo(CompletableFuture<T> target) {
 		return (value, throwable) -> {
 			if (throwable != null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 60cd9ce..3207164 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -265,7 +266,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkPointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkPointFuture.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkPointFuture);
 
 			long checkpointId = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
 			PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
@@ -346,7 +347,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 			// validate that we have a pending checkpoint
 			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
@@ -434,12 +435,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture1.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
 			// trigger second checkpoint, should also succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture2.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 
 			// validate that we have a pending checkpoint
 			assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
@@ -548,7 +549,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 			// validate that we have a pending checkpoint
 			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
@@ -707,7 +708,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture1.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
 			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -726,7 +727,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture2.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 
 			assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
 			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -833,7 +834,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger the first checkpoint. this should succeed
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture1.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
 			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -868,7 +869,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture2 =
 				checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture2.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 
 			assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
 			assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
@@ -990,7 +991,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			// trigger a checkpoint, partially acknowledged
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 			assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 			PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
@@ -1053,7 +1054,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 			long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
 
@@ -1115,7 +1116,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
-		assertFalse(checkpointFuture.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
@@ -1374,11 +1375,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		CompletableFuture<CompletedCheckpoint> checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
-		assertFalse(checkpointFuture1.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(checkpointFuture1);
 
 		CompletableFuture<CompletedCheckpoint> checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
-		assertFalse(checkpointFuture2.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(checkpointFuture2);
 		long checkpointId2 = counter.getLast();
 		assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
@@ -1394,13 +1395,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		CompletableFuture<CompletedCheckpoint> checkpointFuture3 = checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
-		assertFalse(checkpointFuture3.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(checkpointFuture3);
 		assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 		CompletableFuture<CompletedCheckpoint> savepointFuture2 = checkpointCoordinator.triggerSavepoint(savepointDir);
 		manuallyTriggeredScheduledExecutor.triggerAll();
 		long savepointId2 = counter.getLast();
-		assertFalse(savepointFuture2.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(savepointFuture2);
 		assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
 		// 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint
@@ -1736,7 +1737,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			CompletableFuture<CompletedCheckpoint> checkpointFuture =
 				checkpointCoordinator.triggerCheckpoint(false);
 			manuallyTriggeredScheduledExecutor.triggerAll();
-			assertFalse(checkpointFuture.isCompletedExceptionally());
+			FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 			for (PendingCheckpoint checkpoint : checkpointCoordinator.getPendingCheckpoints().values()) {
 				CheckpointProperties props = checkpoint.getProps();
@@ -1963,7 +1964,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		CompletableFuture<CompletedCheckpoint> checkpointFuture =
 			checkpointCoordinator.triggerCheckpoint(false);
 		manuallyTriggeredScheduledExecutor.triggerAll();
-		assertFalse(checkpointFuture.isCompletedExceptionally());
+		FutureUtils.throwIfCompletedExceptionally(checkpointFuture);
 
 		verify(tracker, times(1))
 			.reportPendingCheckpoint(eq(1L), any(Long.class), eq(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)));