You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/05/06 07:24:14 UTC

[flink] 02/04: [FLINK-17514] Let TaskCancelerWatchDog call TaskManagerActions.notifyFatalError with non-null cause

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd76b42228bde4f25969d65712f7799771fb91e9
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue May 5 11:44:41 2020 +0200

    [FLINK-17514] Let TaskCancelerWatchDog call TaskManagerActions.notifyFatalError with non-null cause
    
    In order to avoid NPE we have to call TaskManagerActions.notifyFatalError with non-null arguments. This
    commit changes the behaviour accordingly.
---
 .../org/apache/flink/runtime/taskmanager/Task.java |  4 +--
 .../apache/flink/runtime/taskmanager/TaskTest.java | 40 +++++++++-------------
 2 files changed, 19 insertions(+), 25 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 689ada8..0b35df0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -75,6 +75,7 @@ import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.WrappingRuntimeException;
@@ -1504,8 +1505,7 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
 
 				if (executerThread.isAlive()) {
 					String msg = "Task did not exit gracefully within " + (timeoutMillis / 1000) + " + seconds.";
-					log.error(msg);
-					taskManager.notifyFatalError(msg, null);
+					taskManager.notifyFatalError(msg, new FlinkRuntimeException(msg));
 				}
 			}
 			catch (Throwable t) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 41ae741..3ca2caa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -75,7 +75,9 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -798,12 +800,13 @@ public class TaskTest extends TestLogger {
 	 */
 	@Test
 	public void testFatalErrorAfterUnInterruptibleInvoke() throws Exception {
-		final AwaitFatalErrorTaskManagerActions taskManagerActions =
-			new AwaitFatalErrorTaskManagerActions();
+		final CompletableFuture<Throwable> fatalErrorFuture = new CompletableFuture<>();
+		final TestingTaskManagerActions taskManagerActions = TestingTaskManagerActions.newBuilder()
+			.setNotifyFatalErrorConsumer((s, throwable) -> fatalErrorFuture.complete(throwable))
+			.build();
 
 		final Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5);
-		config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50);
+		config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 10);
 
 		final Task task = createTaskBuilder()
 			.setInvokable(InvokableUnInterruptibleBlockingInvoke.class)
@@ -819,7 +822,8 @@ public class TaskTest extends TestLogger {
 			task.cancelExecution();
 
 			// wait for the notification of notifyFatalError
-			taskManagerActions.latch.await();
+			final Throwable fatalError = fatalErrorFuture.join();
+			assertThat(fatalError, is(notNullValue()));
 		} finally {
 			// Interrupt again to clean up Thread
 			triggerLatch.trigger();
@@ -833,8 +837,10 @@ public class TaskTest extends TestLogger {
 	 */
 	@Test
 	public void testFatalErrorOnCanceling() throws Exception {
-		final AwaitFatalErrorTaskManagerActions taskManagerActions =
-			new AwaitFatalErrorTaskManagerActions();
+		final CompletableFuture<Throwable> fatalErrorFuture = new CompletableFuture<>();
+		final TestingTaskManagerActions taskManagerActions = TestingTaskManagerActions.newBuilder()
+			.setNotifyFatalErrorConsumer((s, throwable) -> fatalErrorFuture.complete(throwable))
+			.build();
 
 		final Configuration config = new Configuration();
 		config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5);
@@ -846,7 +852,8 @@ public class TaskTest extends TestLogger {
 			.setTaskManagerActions(taskManagerActions)
 			.build());
 
-		doThrow(OutOfMemoryError.class).when(task).cancelOrFailAndCancelInvokableInternal(eq(ExecutionState.CANCELING), eq(null));
+		final Class<OutOfMemoryError> fatalErrorType = OutOfMemoryError.class;
+		doThrow(fatalErrorType).when(task).cancelOrFailAndCancelInvokableInternal(eq(ExecutionState.CANCELING), eq(null));
 
 		try {
 			task.startTaskThread();
@@ -856,9 +863,8 @@ public class TaskTest extends TestLogger {
 			task.cancelExecution();
 
 			// wait for the notification of notifyFatalError
-			taskManagerActions.latch.await();
-		} catch (Throwable t) {
-			fail("No exception is expected to be thrown by fatal error handling");
+			final Throwable fatalError = fatalErrorFuture.join();
+			assertThat(fatalError, instanceOf(fatalErrorType));
 		} finally {
 			triggerLatch.trigger();
 		}
@@ -1010,18 +1016,6 @@ public class TaskTest extends TestLogger {
 		}
 	}
 
-	/**
-	 * Customized TaskManagerActions that waits for a call of notifyFatalError.
-	 */
-	private static class AwaitFatalErrorTaskManagerActions extends NoOpTaskManagerActions {
-		private final OneShotLatch latch = new OneShotLatch();
-
-		@Override
-		public void notifyFatalError(String message, Throwable cause) {
-			latch.trigger();
-		}
-	}
-
 	// ------------------------------------------------------------------------
 	//  helper functions
 	// ------------------------------------------------------------------------