You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/05/21 11:58:51 UTC
[flink] branch release-1.13 updated: [FLINK-22688][coordination]
Eases assertion on ExceptionHistoryEntry
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 1aeb7e3 [FLINK-22688][coordination] Eases assertion on ExceptionHistoryEntry
1aeb7e3 is described below
commit 1aeb7e3fab771d8ecfbd0efbdc5a60f4dea41b51
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri May 21 13:57:45 2021 +0200
[FLINK-22688][coordination] Eases assertion on ExceptionHistoryEntry
---
.../rest/handler/job/JobExceptionsHandler.java | 3 --
.../exceptionhistory/ExceptionHistoryEntry.java | 3 ++
.../rest/handler/job/JobExceptionsHandlerTest.java | 27 ++++++++++++++
.../runtime/scheduler/DefaultSchedulerTest.java | 42 ++++++++++++++++++++++
.../ExceptionHistoryEntryTest.java | 37 +++++++++++++++++++
5 files changed, 109 insertions(+), 3 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index c7de5d7..f9e0393 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -215,9 +215,6 @@ public class JobExceptionsHandler
Preconditions.checkArgument(
exceptionHistoryEntry.getFailingTaskName() != null,
"The taskName must not be null for a non-global failure.");
- Preconditions.checkArgument(
- exceptionHistoryEntry.getTaskManagerLocation() != null,
- "The location must not be null for a non-global failure.");
}
@VisibleForTesting
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
index 40b782d..acd0505 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java
@@ -48,10 +48,13 @@ public class ExceptionHistoryEntry extends ErrorInfo {
* @param failedExecution the failed {@code Execution}.
* @param taskName the name of the task.
* @return The {@code ExceptionHistoryEntry}.
+ * @throws NullPointerException if {@code null} is passed as one of the parameters.
* @throws IllegalArgumentException if the passed {@code Execution} does not provide a {@link
* Execution#getFailureInfo() failureInfo}.
*/
public static ExceptionHistoryEntry create(AccessExecution failedExecution, String taskName) {
+ Preconditions.checkNotNull(failedExecution, "No Execution is specified.");
+ Preconditions.checkNotNull(taskName, "No task name is specified.");
Preconditions.checkArgument(
failedExecution.getFailureInfo().isPresent(),
"The selected Execution " + failedExecution.getAttemptId() + " didn't fail.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index ff8d8c3..7d81342 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -164,6 +164,33 @@ public class JobExceptionsHandlerTest extends TestLogger {
}
@Test
+ public void testWithLocalExceptionHistoryEntryNotHavingATaskManagerInformationAvailable()
+ throws HandlerRequestException {
+ final RootExceptionHistoryEntry failure =
+ new RootExceptionHistoryEntry(
+ new RuntimeException("exception #1"),
+ System.currentTimeMillis(),
+ "task name",
+ null,
+ Collections.emptySet());
+
+ final ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(failure);
+ final HandlerRequest<EmptyRequestBody, JobExceptionsMessageParameters> request =
+ createRequest(executionGraphInfo.getJobId(), 10);
+ final JobExceptionsInfoWithHistory response =
+ testInstance.handleRequest(request, executionGraphInfo);
+
+ assertThat(
+ response.getExceptionHistory().getEntries(),
+ contains(
+ historyContainsJobExceptionInfo(
+ failure.getException(),
+ failure.getTimestamp(),
+ failure.getFailingTaskName(),
+ JobExceptionsHandler.toString(failure.getTaskManagerLocation()))));
+ }
+
+ @Test
public void testWithExceptionHistoryWithTruncationThroughParameter()
throws HandlerRequestException {
final RootExceptionHistoryEntry rootCause =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index dad9a41..3dfdac3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -1132,6 +1132,48 @@ public class DefaultSchedulerTest extends TestLogger {
}
@Test
+ public void testExceptionHistoryWithPreDeployFailure() {
+ // disable auto-completing slot requests to simulate timeout
+ executionSlotAllocatorFactory
+ .getTestExecutionSlotAllocator()
+ .disableAutoCompletePendingRequests();
+ final DefaultScheduler scheduler =
+ createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph());
+
+ executionSlotAllocatorFactory.getTestExecutionSlotAllocator().timeoutPendingRequests();
+
+ final ArchivedExecutionVertex taskFailureExecutionVertex =
+ Iterables.getOnlyElement(
+ scheduler
+ .requestJob()
+ .getArchivedExecutionGraph()
+ .getAllExecutionVertices());
+
+ // pending slot request timeout triggers a task failure that needs to be processed
+ taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+ // sanity check that the TaskManagerLocation of the failed task is indeed null, as expected
+ assertThat(
+ taskFailureExecutionVertex.getCurrentAssignedResourceLocation(), is(nullValue()));
+
+ final ErrorInfo failureInfo =
+ taskFailureExecutionVertex
+ .getFailureInfo()
+ .orElseThrow(() -> new AssertionError("A failureInfo should be set."));
+
+ final Iterable<RootExceptionHistoryEntry> actualExceptionHistory =
+ scheduler.getExceptionHistory();
+ assertThat(
+ actualExceptionHistory,
+ IsIterableContainingInOrder.contains(
+ ExceptionHistoryEntryMatcher.matchesFailure(
+ failureInfo.getException(),
+ failureInfo.getTimestamp(),
+ taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(),
+ taskFailureExecutionVertex.getCurrentAssignedResourceLocation())));
+ }
+
+ @Test
public void testExceptionHistoryConcurrentRestart() throws Exception {
final JobGraph jobGraph = singleJobVertexJobGraph(2);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
index fa2225b..2b1422b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryTest.java
@@ -36,6 +36,7 @@ import java.util.Optional;
import static org.apache.flink.runtime.scheduler.exceptionhistory.ArchivedTaskManagerLocationMatcher.isArchivedTaskManagerLocation;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
/** {@code ExceptionHistoryEntryTest} tests the creation of {@link ExceptionHistoryEntry}. */
@@ -71,6 +72,42 @@ public class ExceptionHistoryEntryTest extends TestLogger {
"task name");
}
+ @Test(expected = NullPointerException.class)
+ public void testNullExecution() {
+ ExceptionHistoryEntry.create(null, "task name");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testNullTaskName() {
+ ExceptionHistoryEntry.create(
+ new TestAccessExecution(
+ new ExecutionAttemptID(),
+ new Exception("Expected failure"),
+ System.currentTimeMillis(),
+ new LocalTaskManagerLocation()),
+ null);
+ }
+
+ @Test
+ public void testWithMissingTaskManagerLocation() {
+ final Exception failure = new Exception("Expected failure");
+ final long timestamp = System.currentTimeMillis();
+ final String taskName = "task name";
+
+ final ExceptionHistoryEntry entry =
+ ExceptionHistoryEntry.create(
+ new TestAccessExecution(new ExecutionAttemptID(), failure, timestamp, null),
+ taskName);
+
+ assertThat(
+ entry.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+ is(failure));
+ assertThat(entry.getTimestamp(), is(timestamp));
+ assertThat(entry.getFailingTaskName(), is(taskName));
+ assertThat(entry.getTaskManagerLocation(), is(nullValue()));
+ assertThat(entry.isGlobal(), is(false));
+ }
+
private static class TestAccessExecution implements AccessExecution {
private final ExecutionAttemptID executionAttemptID;