You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/07 15:23:41 UTC

[flink] 01/02: [FLINK-12219] Log uncaught exceptions and terminate in case Dispatcher#jobReachedGloballyTerminalState fails

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 417d6d2070e7ff82eb73a605f12f50ca13acce15
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Apr 30 14:47:16 2019 +0200

    [FLINK-12219] Log uncaught exceptions and terminate in case Dispatcher#jobReachedGloballyTerminalState fails
    
    FutureUtils#assertNoException will assert that the given future has not been completed
    exceptionally. If it has been completed exceptionally, then it will call the
    FatalExitExceptionHandler.
    
    This commit uses assertNoException to assert that the Dispatcher#jobReachedGloballyTerminalState
    method has not failed.
    
    This closes #8334.
---
 .../flink/runtime/concurrent/FutureUtils.java      | 26 ++++++++++
 .../flink/runtime/dispatcher/Dispatcher.java       | 38 ++++++++-------
 .../flink/runtime/concurrent/FutureUtilsTest.java  | 56 ++++++++++++++++++++++
 3 files changed, 104 insertions(+), 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 1458eab..c1613c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.concurrent;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.SupplierWithException;
@@ -969,4 +970,29 @@ public class FutureUtils {
 			return DELAYER.schedule(runnable, delay, timeUnit);
 		}
 	}
+
+	/**
+	 * Asserts that the given {@link CompletableFuture} is not completed exceptionally. If the future
+	 * is completed exceptionally, then it will call the {@link FatalExitExceptionHandler}.
+	 *
+	 * @param completableFuture to assert for no exceptions
+	 */
+	public static void assertNoException(CompletableFuture<?> completableFuture) {
+		handleUncaughtException(completableFuture, FatalExitExceptionHandler.INSTANCE);
+	}
+
+	/**
+	 * Checks that the given {@link CompletableFuture} is not completed exceptionally. If the future
+	 * is completed exceptionally, then it will call the given uncaught exception handler.
+	 *
+	 * @param completableFuture to assert for no exceptions
+	 * @param uncaughtExceptionHandler to call if the future is completed exceptionally
+	 */
+	public static void handleUncaughtException(CompletableFuture<?> completableFuture, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+		checkNotNull(completableFuture).whenComplete((ignored, throwable) -> {
+			if (throwable != null) {
+				uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), throwable);
+			}
+		});
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 1f1fc62..6f8f27d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -363,26 +363,32 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
 		final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
-		jobManagerRunner.getResultFuture().whenCompleteAsync(
-			(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
-				// check if we are still the active JobManagerRunner by checking the identity
-				//noinspection ObjectEquality
-				if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) {
-					if (archivedExecutionGraph != null) {
-						jobReachedGloballyTerminalState(archivedExecutionGraph);
-					} else {
-						final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
 
-						if (strippedThrowable instanceof JobNotFinishedException) {
-							jobNotFinished(jobId);
+		FutureUtils.assertNoException(
+			jobManagerRunner.getResultFuture().handleAsync(
+				(ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
+					// check if we are still the active JobManagerRunner by checking the identity
+					final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
+					final JobManagerRunner currentJobManagerRunner = jobManagerRunnerFuture != null ? jobManagerRunnerFuture.getNow(null) : null;
+					//noinspection ObjectEquality
+					if (jobManagerRunner == currentJobManagerRunner) {
+						if (archivedExecutionGraph != null) {
+							jobReachedGloballyTerminalState(archivedExecutionGraph);
 						} else {
-							jobMasterFailed(jobId, strippedThrowable);
+							final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+
+							if (strippedThrowable instanceof JobNotFinishedException) {
+								jobNotFinished(jobId);
+							} else {
+								jobMasterFailed(jobId, strippedThrowable);
+							}
 						}
+					} else {
+						log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
 					}
-				} else {
-					log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
-				}
-			}, getMainThreadExecutor());
+
+					return null;
+				}, getMainThreadExecutor()));
 
 		jobManagerRunner.start();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index bfbd62e..e16771c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -650,4 +650,60 @@ public class FutureUtilsTest extends TestLogger {
 		Assert.assertFalse(runWithExecutor.get());
 		Assert.assertTrue(continuationFuture.isDone());
 	}
+
+	@Test
+	public void testHandleUncaughtExceptionWithCompletedFuture() {
+		final CompletableFuture<String> future = CompletableFuture.completedFuture("foobar");
+		final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
+
+		FutureUtils.handleUncaughtException(future, uncaughtExceptionHandler);
+		assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(false));
+	}
+
+	@Test
+	public void testHandleUncaughtExceptionWithNormalCompletion() {
+		final CompletableFuture<String> future = new CompletableFuture<>();
+
+		final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
+
+		FutureUtils.handleUncaughtException(future, uncaughtExceptionHandler);
+		future.complete("barfoo");
+		assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(false));
+	}
+
+	@Test
+	public void testHandleUncaughtExceptionWithExceptionallyCompletedFuture() {
+		final CompletableFuture<String> future = FutureUtils.completedExceptionally(new FlinkException("foobar"));
+
+		final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
+
+		FutureUtils.handleUncaughtException(future, uncaughtExceptionHandler);
+		assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(true));
+	}
+
+	@Test
+	public void testHandleUncaughtExceptionWithExceptionallyCompletion() {
+		final CompletableFuture<String> future = new CompletableFuture<>();
+
+		final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
+
+		FutureUtils.handleUncaughtException(future, uncaughtExceptionHandler);
+		assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(false));
+		future.completeExceptionally(new FlinkException("barfoo"));
+		assertThat(uncaughtExceptionHandler.hasBeenCalled(), is(true));
+	}
+
+	private static class TestingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+		private Throwable exception = null;
+
+		@Override
+		public void uncaughtException(Thread t, Throwable e) {
+			exception = e;
+		}
+
+		private boolean hasBeenCalled() {
+			return exception != null;
+		}
+	}
 }