You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/05/21 11:58:51 UTC

[flink] branch release-1.13 updated: [FLINK-22688][coordination] Eases assertion on ExceptionHistoryEntry

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 1aeb7e3  [FLINK-22688][coordination] Eases assertion on ExceptionHistoryEntry
1aeb7e3 is described below

commit 1aeb7e3fab771d8ecfbd0efbdc5a60f4dea41b51
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri May 21 13:57:45 2021 +0200

    [FLINK-22688][coordination] Eases assertion on ExceptionHistoryEntry
---
 .../rest/handler/job/JobExceptionsHandler.java     |  3 --
 .../exceptionhistory/ExceptionHistoryEntry.java    |  3 ++
 .../rest/handler/job/JobExceptionsHandlerTest.java | 27 ++++++++++++++
 .../runtime/scheduler/DefaultSchedulerTest.java    | 42 ++++++++++++++++++++++
 .../ExceptionHistoryEntryTest.java                 | 37 +++++++++++++++++++
 5 files changed, 109 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index c7de5d7..f9e0393 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -215,9 +215,6 @@ public class JobExceptionsHandler
         Preconditions.checkArgument(
                 exceptionHistoryEntry.getFailingTaskName() != null,
                 "The taskName must not be null for a non-global failure.");
-        Preconditions.checkArgument(
-                exceptionHistoryEntry.getTaskManagerLocation() != null,
-                "The location must not be null for a non-global failure.");
     }
 
     @VisibleForTesting
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
index 40b782d..acd0505 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
@@ -48,10 +48,13 @@ public class ExceptionHistoryEntry extends ErrorInfo {
      * @param failedExecution the failed {@code Execution}.
      * @param taskName the name of the task.
      * @return The {@code ExceptionHistoryEntry}.
+     * @throws NullPointerException if {@code null} is passed as one of the parameters.
      * @throws IllegalArgumentException if the passed {@code Execution} does not provide a {@link
      *     Execution#getFailureInfo() failureInfo}.
      */
     public static ExceptionHistoryEntry create(AccessExecution failedExecution, String taskName) {
+        Preconditions.checkNotNull(failedExecution, "No Execution is specified.");
+        Preconditions.checkNotNull(taskName, "No task name is specified.");
         Preconditions.checkArgument(
                 failedExecution.getFailureInfo().isPresent(),
                 "The selected Execution " + failedExecution.getAttemptId() + " didn't fail.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index ff8d8c3..7d81342 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -164,6 +164,33 @@ public class JobExceptionsHandlerTest extends TestLogger {
     }
 
     @Test
+    public void testWithLocalExceptionHistoryEntryNotHavingATaskManagerInformationAvailable()
+            throws HandlerRequestException {
+        final RootExceptionHistoryEntry failure =
+                new RootExceptionHistoryEntry(
+                        new RuntimeException("exception #1"),
+                        System.currentTimeMillis(),
+                        "task name",
+                        null,
+                        Collections.emptySet());
+
+        final ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(failure);
+        final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> request =
+                createRequest(executionGraphInfo.getJobId(), 10);
+        final JobExceptionsInfoWithHistory response =
+                testInstance.handleRequest(request, executionGraphInfo);
+
+        assertThat(
+                response.getExceptionHistory().getEntries(),
+                contains(
+                        historyContainsJobExceptionInfo(
+                                failure.getException(),
+                                failure.getTimestamp(),
+                                failure.getFailingTaskName(),
+                                JobExceptionsHandler.toString(failure.getTaskManagerLocation()))));
+    }
+
+    @Test
     public void testWithExceptionHistoryWithTruncationThroughParameter()
             throws HandlerRequestException {
         final RootExceptionHistoryEntry rootCause =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index dad9a41..3dfdac3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -1132,6 +1132,48 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     @Test
+    public void testExceptionHistoryWithPreDeployFailure() {
+        // disable auto-completing slot requests to simulate timeout
+        executionSlotAllocatorFactory
+                .getTestExecutionSlotAllocator()
+                .disableAutoCompletePendingRequests();
+        final DefaultScheduler scheduler =
+                createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph());
+
+        executionSlotAllocatorFactory.getTestExecutionSlotAllocator().timeoutPendingRequests();
+
+        final ArchivedExecutionVertex taskFailureExecutionVertex =
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
+
+        // pending slot request timeout triggers a task failure that needs to be processed
+        taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+        // sanity check that the TaskManagerLocation of the failed task is indeed null, as expected
+        assertThat(
+                taskFailureExecutionVertex.getCurrentAssignedResourceLocation(), is(nullValue()));
+
+        final ErrorInfo failureInfo =
+                taskFailureExecutionVertex
+                        .getFailureInfo()
+                        .orElseThrow(() -> new AssertionError("A failureInfo should be set."));
+
+        final Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+                scheduler.getExceptionHistory();
+        assertThat(
+                actualExceptionHistory,
+                IsIterableContainingInOrder.contains(
+                        ExceptionHistoryEntryMatcher.matchesFailure(
+                                failureInfo.getException(),
+                                failureInfo.getTimestamp(),
+                                taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(),
+                                taskFailureExecutionVertex.getCurrentAssignedResourceLocation())));
+    }
+
+    @Test
     public void testExceptionHistoryConcurrentRestart() throws Exception {
         final JobGraph jobGraph = singleJobVertexJobGraph(2);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
index fa2225b..2b1422b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
@@ -36,6 +36,7 @@ import java.util.Optional;
 
 import static org.apache.flink.runtime.scheduler.exceptionhistory.ArchivedTaskManagerLocationMatcher.isArchivedTaskManagerLocation;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
 
 /** {@code ExceptionHistoryEntryTest} tests the creation of {@link ExceptionHistoryEntry}. */
@@ -71,6 +72,42 @@ public class ExceptionHistoryEntryTest extends TestLogger {
                 "task name");
     }
 
+    @Test(expected = NullPointerException.class)
+    public void testNullExecution() {
+        ExceptionHistoryEntry.create(null, "task name");
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testNullTaskName() {
+        ExceptionHistoryEntry.create(
+                new TestAccessExecution(
+                        new ExecutionAttemptID(),
+                        new Exception("Expected failure"),
+                        System.currentTimeMillis(),
+                        new LocalTaskManagerLocation()),
+                null);
+    }
+
+    @Test
+    public void testWithMissingTaskManagerLocation() {
+        final Exception failure = new Exception("Expected failure");
+        final long timestamp = System.currentTimeMillis();
+        final String taskName = "task name";
+
+        final ExceptionHistoryEntry entry =
+                ExceptionHistoryEntry.create(
+                        new TestAccessExecution(new ExecutionAttemptID(), failure, timestamp, null),
+                        taskName);
+
+        assertThat(
+                entry.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                is(failure));
+        assertThat(entry.getTimestamp(), is(timestamp));
+        assertThat(entry.getFailingTaskName(), is(taskName));
+        assertThat(entry.getTaskManagerLocation(), is(nullValue()));
+        assertThat(entry.isGlobal(), is(false));
+    }
+
     private static class TestAccessExecution implements AccessExecution {
 
         private final ExecutionAttemptID executionAttemptID;