You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/05/06 07:24:14 UTC
[flink] 02/04: [FLINK-17514] Let TaskCancelerWatchDog call
TaskManagerActions.notifyFatalError with non-null cause
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bd76b42228bde4f25969d65712f7799771fb91e9
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue May 5 11:44:41 2020 +0200
[FLINK-17514] Let TaskCancelerWatchDog call TaskManagerActions.notifyFatalError with non-null cause
In order to avoid NPE we have to call TaskManagerActions.notifyFatalError with non-null arguments. This
commit changes the behaviour accordingly.
---
.../org/apache/flink/runtime/taskmanager/Task.java | 4 +--
.../apache/flink/runtime/taskmanager/TaskTest.java | 40 +++++++++-------------
2 files changed, 19 insertions(+), 25 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 689ada8..0b35df0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -75,6 +75,7 @@ import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.WrappingRuntimeException;
@@ -1504,8 +1505,7 @@ public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionPr
if (executerThread.isAlive()) {
String msg = "Task did not exit gracefully within " + (timeoutMillis / 1000) + " + seconds.";
- log.error(msg);
- taskManager.notifyFatalError(msg, null);
+ taskManager.notifyFatalError(msg, new FlinkRuntimeException(msg));
}
}
catch (Throwable t) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 41ae741..3ca2caa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -75,7 +75,9 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -798,12 +800,13 @@ public class TaskTest extends TestLogger {
*/
@Test
public void testFatalErrorAfterUnInterruptibleInvoke() throws Exception {
- final AwaitFatalErrorTaskManagerActions taskManagerActions =
- new AwaitFatalErrorTaskManagerActions();
+ final CompletableFuture<Throwable> fatalErrorFuture = new CompletableFuture<>();
+ final TestingTaskManagerActions taskManagerActions = TestingTaskManagerActions.newBuilder()
+ .setNotifyFatalErrorConsumer((s, throwable) -> fatalErrorFuture.complete(throwable))
+ .build();
final Configuration config = new Configuration();
- config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5);
- config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50);
+ config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 10);
final Task task = createTaskBuilder()
.setInvokable(InvokableUnInterruptibleBlockingInvoke.class)
@@ -819,7 +822,8 @@ public class TaskTest extends TestLogger {
task.cancelExecution();
// wait for the notification of notifyFatalError
- taskManagerActions.latch.await();
+ final Throwable fatalError = fatalErrorFuture.join();
+ assertThat(fatalError, is(notNullValue()));
} finally {
// Interrupt again to clean up Thread
triggerLatch.trigger();
@@ -833,8 +837,10 @@ public class TaskTest extends TestLogger {
*/
@Test
public void testFatalErrorOnCanceling() throws Exception {
- final AwaitFatalErrorTaskManagerActions taskManagerActions =
- new AwaitFatalErrorTaskManagerActions();
+ final CompletableFuture<Throwable> fatalErrorFuture = new CompletableFuture<>();
+ final TestingTaskManagerActions taskManagerActions = TestingTaskManagerActions.newBuilder()
+ .setNotifyFatalErrorConsumer((s, throwable) -> fatalErrorFuture.complete(throwable))
+ .build();
final Configuration config = new Configuration();
config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5);
@@ -846,7 +852,8 @@ public class TaskTest extends TestLogger {
.setTaskManagerActions(taskManagerActions)
.build());
- doThrow(OutOfMemoryError.class).when(task).cancelOrFailAndCancelInvokableInternal(eq(ExecutionState.CANCELING), eq(null));
+ final Class<OutOfMemoryError> fatalErrorType = OutOfMemoryError.class;
+ doThrow(fatalErrorType).when(task).cancelOrFailAndCancelInvokableInternal(eq(ExecutionState.CANCELING), eq(null));
try {
task.startTaskThread();
@@ -856,9 +863,8 @@ public class TaskTest extends TestLogger {
task.cancelExecution();
// wait for the notification of notifyFatalError
- taskManagerActions.latch.await();
- } catch (Throwable t) {
- fail("No exception is expected to be thrown by fatal error handling");
+ final Throwable fatalError = fatalErrorFuture.join();
+ assertThat(fatalError, instanceOf(fatalErrorType));
} finally {
triggerLatch.trigger();
}
@@ -1010,18 +1016,6 @@ public class TaskTest extends TestLogger {
}
}
- /**
- * Customized TaskManagerActions that waits for a call of notifyFatalError.
- */
- private static class AwaitFatalErrorTaskManagerActions extends NoOpTaskManagerActions {
- private final OneShotLatch latch = new OneShotLatch();
-
- @Override
- public void notifyFatalError(String message, Throwable cause) {
- latch.trigger();
- }
- }
-
// ------------------------------------------------------------------------
// helper functions
// ------------------------------------------------------------------------