You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/29 17:24:14 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

tillrohrmann commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r566868391



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
##########
@@ -64,9 +64,10 @@
      * Returns the exception that caused the job to fail. This is the first root exception that was
      * not recoverable and triggered job failure.
      *
-     * @return failure exception as a string, or {@code "(null)"}
+     * @return failure exception wrapped in a {@link ErrorInfo}, or {@code null} if no exception was
+     *     caught.
      */
-    String getFailureCauseAsString();
+    ErrorInfo getFailureInfo();

Review comment:
       Same here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
##########
@@ -64,9 +65,11 @@
      * Returns the exception that caused the job to fail. This is the first root exception that was
      * not recoverable and triggered job failure.
      *
-     * @return failure exception as a string, or {@code "(null)"}
+     * @return {@link ErrorInfo} containing the {@code Throwable} wrapped in a {@link
+     *     SerializedThrowable} and the time it was registered, or {@code null} if no exception was
+     *     caught.
      */
-    String getFailureCauseAsString();
+    ErrorInfo getFailureInfo();

Review comment:
       If this method returns `null`, then I would suggest to return an `Optional<ErrorInfo>`. This makes the contract clearer.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
##########
@@ -70,14 +69,14 @@ public ArchivedExecution(
             ExecutionAttemptID attemptId,
             int attemptNumber,
             ExecutionState state,
-            String failureCause,
+            ErrorInfo failureCause,

Review comment:
       `@Nullable` missing.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
##########
@@ -37,7 +36,7 @@
 
     private final ExecutionState state;
 
-    private final String failureCause; // once assigned, never changes
+    private final ErrorInfo failureInfo; // once assigned, never changes

Review comment:
       I guess that this field is `@Nullable`, right?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
##########
@@ -99,23 +103,31 @@ public FailureHandlingResult getGlobalFailureHandlingResult(final Throwable caus
     }
 
     private FailureHandlingResult handleFailure(
+            final ExecutionVertexID failingExecutionVertexId,

Review comment:
       I think `@Nullable` annotation is missing here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##########
@@ -322,8 +322,11 @@ public Throwable getFailureCause() {
     }
 
     @Override
-    public String getFailureCauseAsString() {
-        return ExceptionUtils.stringifyException(getFailureCause());
+    public ErrorInfo getFailureInfo() {
+        return getFailureCause() == null
+                ? null
+                : new ErrorInfo(
+                        new SerializedThrowable(getFailureCause()), getStateTimestamp(FAILED));

Review comment:
       I think wrapping `getFailureCause` in a `SerializedThrowable` is not necessary because that's what `ErrorInfo` does in the constructor.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
##########
@@ -351,7 +351,19 @@ private static void compareExecutionVertex(
                 runtimeVertex.getStateTimestamp(ExecutionState.FAILED),
                 archivedVertex.getStateTimestamp(ExecutionState.FAILED));
         assertEquals(
-                runtimeVertex.getFailureCauseAsString(), archivedVertex.getFailureCauseAsString());
+                runtimeVertex.getFailureInfo() == null
+                        ? null
+                        : runtimeVertex.getFailureInfo().getExceptionAsString(),
+                archivedVertex.getFailureInfo() == null
+                        ? null
+                        : archivedVertex.getFailureInfo().getExceptionAsString());

Review comment:
       with `Optional<ErrorInfo>` this might look a bit nicer.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -100,17 +116,22 @@ public long getRestartDelayMS() {
         }
     }
 
+    /**
+     * Returns the {@link ExecutionVertexID} of the task causing this failure.
+     *
+     * @return The {@code ExecutionVertexID} or {@code null} if it's a global failure.
+     */
+    public ExecutionVertexID getExecutionVertexIdOfFailedTask() {

Review comment:
       `@Nullable` is missing.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -38,6 +39,9 @@
     /** Delay before the restarting can be conducted. */
     private final long restartDelayMS;
 
+    /** The ExecutionVertexID refering to the ExecutionVertex the failure is originating from. */
+    private final ExecutionVertexID failingExecutionVertexId;

Review comment:
       I think `@Nullable` is missing here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -47,27 +51,39 @@
     /**
      * Creates a result of a set of tasks to restart to recover from the failure.
      *
+     * @param failingExecutionVertexId the {@link ExecutionVertexID} refering to the {@link
+     *     ExecutionVertex} the failure is originating from.

Review comment:
       Explanation missing what `null` value means.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -267,9 +267,16 @@ private void restartTasksWithDelay(final FailureHandlingResult failureHandlingRe
         delayExecutor.schedule(
                 () ->
                         FutureUtils.assertNoException(
-                                cancelFuture.thenRunAsync(
-                                        restartTasks(executionVertexVersions, globalRecovery),
-                                        getMainThreadExecutor())),
+                                cancelFuture
+                                        .thenRunAsync(
+                                                () ->
+                                                        archiveFromFailureHandlingResult(
+                                                                failureHandlingResult),
+                                                getMainThreadExecutor())
+                                        .thenRunAsync(
+                                                restartTasks(
+                                                        executionVertexVersions, globalRecovery),
+                                                getMainThreadExecutor())),

Review comment:
       Why did you introduce a second `thenRunAsync`?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -870,6 +874,78 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
         assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0));
     }
 
+    @Test
+    public void testExceptionHistoryWithRestartableFailure() {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+        // initiate restartable failure
+        final ExecutionAttemptID restartableAttemptId =
+                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException restartableException = new RuntimeException("restartable exception");
+        Range<Long> updateStateTriggeringRestartTimeframe =
+                initiateFailure(scheduler, jobId, restartableAttemptId, restartableException);
+
+        taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+        // initiate job failure
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final ExecutionAttemptID failingAttemptId =
+                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException failingException = new RuntimeException("failing exception");
+        Range<Long> updateStateTriggeringJobFailureTimeframe =
+                initiateFailure(scheduler, jobId, failingAttemptId, failingException);
+
+        List<ErrorInfo> actualExceptionHistory = scheduler.getExceptionHistory();
+        assertThat(actualExceptionHistory.size(), is(2));
+
+        // assert restarted attempt
+        ErrorInfo restartableFailure = actualExceptionHistory.get(0);
+        assertThat(
+                restartableFailure
+                        .getException()
+                        .deserializeError(ClassLoader.getSystemClassLoader()),
+                is(restartableException));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint()));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint()));
+
+        // assert job failure attempt
+        ErrorInfo globalFailure = actualExceptionHistory.get(1);
+        Throwable actualException =
+                globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader());
+        assertThat(actualException, org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class));
+        assertThat(actualException.getCause(), is(failingException));

Review comment:
       One could use `FlinkMatchers.containsCause`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -641,6 +648,33 @@ public void cancel() {
         return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
     }
 
+    protected void archiveGlobalFailure(Throwable failure) {
+        Set<ExecutionAttemptID> executionAttemptIds =
+                IterableUtils.toStream(executionGraph.getAllExecutionVertices())
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .map(Execution::getAttemptId)
+                        .collect(Collectors.toSet());
+
+        globalTaskFailures.put(
+                executionAttemptIds, new ErrorInfo(failure, System.currentTimeMillis()));
+
+        log.debug("Archive global {}: {}", failure, failure.getMessage());

Review comment:
       How would this log line look like? Wouldn't `failure.getMessage` be duplicated? Maybe also say "Archive global failure...".

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -641,6 +648,33 @@ public void cancel() {
         return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
     }
 
+    protected void archiveGlobalFailure(Throwable failure) {
+        Set<ExecutionAttemptID> executionAttemptIds =
+                IterableUtils.toStream(executionGraph.getAllExecutionVertices())
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .map(Execution::getAttemptId)
+                        .collect(Collectors.toSet());
+
+        globalTaskFailures.put(
+                executionAttemptIds, new ErrorInfo(failure, System.currentTimeMillis()));
+
+        log.debug("Archive global {}: {}", failure, failure.getMessage());
+    }
+
+    protected void archiveFromFailureHandlingResult(FailureHandlingResult failureHandlingResult) {
+        if (failureHandlingResult.getExecutionVertexIdOfFailedTask() == null) {

Review comment:
       I think it is nicer if you have something like `failureHandlingResult.isGlobalFailure()`. `failureHandlingResult.getExecutionVertexIdOfFailedTask() == null` is not very expressive.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -100,17 +116,22 @@ public long getRestartDelayMS() {
         }
     }
 
+    /**
+     * Returns the {@link ExecutionVertexID} of the task causing this failure.
+     *
+     * @return The {@code ExecutionVertexID} or {@code null} if it's a global failure.
+     */
+    public ExecutionVertexID getExecutionVertexIdOfFailedTask() {

Review comment:
       Alternatively you can introduce a `isLocalFailure()` method and let this method fail if `failingExecutionVertexId == null`. That way you enforce the contract that you can only ask for the failing execution vertex id if it is a local failure.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -47,27 +51,39 @@
     /**
      * Creates a result of a set of tasks to restart to recover from the failure.
      *
+     * @param failingExecutionVertexId the {@link ExecutionVertexID} refering to the {@link
+     *     ExecutionVertex} the failure is originating from.
+     * @param cause the exception that caused this failure.
      * @param verticesToRestart containing task vertices to restart to recover from the failure
      * @param restartDelayMS indicate a delay before conducting the restart
      */
     private FailureHandlingResult(
-            Set<ExecutionVertexID> verticesToRestart, long restartDelayMS, boolean globalFailure) {
+            ExecutionVertexID failingExecutionVertexId,

Review comment:
       `@Nullable` is missing.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -164,6 +168,8 @@
     private final Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap;
 
     private final ComponentMainThreadExecutor mainThreadExecutor;
+    private final Map<ExecutionAttemptID, ErrorInfo> localTaskFailures = new HashMap<>();
+    private final Map<Set<ExecutionAttemptID>, ErrorInfo> globalTaskFailures = new HashMap<>();

Review comment:
       For what do we need the maps here? Wouldn't a single `List<ErrorInfo>` be good enough here?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -870,6 +874,78 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
         assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0));
     }
 
+    @Test
+    public void testExceptionHistoryWithRestartableFailure() {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+        // initiate restartable failure
+        final ExecutionAttemptID restartableAttemptId =
+                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException restartableException = new RuntimeException("restartable exception");
+        Range<Long> updateStateTriggeringRestartTimeframe =
+                initiateFailure(scheduler, jobId, restartableAttemptId, restartableException);
+
+        taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+        // initiate job failure
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final ExecutionAttemptID failingAttemptId =
+                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException failingException = new RuntimeException("failing exception");
+        Range<Long> updateStateTriggeringJobFailureTimeframe =
+                initiateFailure(scheduler, jobId, failingAttemptId, failingException);
+
+        List<ErrorInfo> actualExceptionHistory = scheduler.getExceptionHistory();
+        assertThat(actualExceptionHistory.size(), is(2));
+
+        // assert restarted attempt
+        ErrorInfo restartableFailure = actualExceptionHistory.get(0);
+        assertThat(
+                restartableFailure
+                        .getException()
+                        .deserializeError(ClassLoader.getSystemClassLoader()),
+                is(restartableException));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint()));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint()));
+
+        // assert job failure attempt
+        ErrorInfo globalFailure = actualExceptionHistory.get(1);
+        Throwable actualException =
+                globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader());
+        assertThat(actualException, org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class));
+        assertThat(actualException.getCause(), is(failingException));
+        assertThat(
+                globalFailure.getTimestamp(),
+                greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint()));
+        assertThat(
+                globalFailure.getTimestamp(),
+                lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
+    }
+
+    private static Range<Long> initiateFailure(
+            DefaultScheduler scheduler,
+            JobID jobId,
+            ExecutionAttemptID executionAttemptID,
+            Throwable exception) {
+        long start = System.currentTimeMillis();

Review comment:
       `System.currentTimeMillis()` can be susceptible to clock resets. Hence, it can cause some test instabilities. Unfortunately, `Execution` also uses this to determine the state timestamp.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org