You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/20 10:54:51 UTC

[flink] branch master updated (32ed582 -> c77a686)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 32ed582  [FLINK-21213][task] Degrade log level to INFO when ignore to decline checkpoint as task not running
     new a502892  [hotfix][runtime] Fixes ArchivedExecutionGraphTest
     new ae2d06b  [hotfix][runtime] Fixes checkstyle errors
     new 7eac5c6  [hotfix][runtime] Adds failure cause to log message
     new c77a686  [FLINK-21187][runtime] Adds exception history

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/executiongraph/AccessExecution.java    |  13 +-
 .../executiongraph/AccessExecutionVertex.java      |   8 +-
 .../runtime/executiongraph/ArchivedExecution.java  |  18 ++-
 .../executiongraph/ArchivedExecutionVertex.java    |   9 +-
 .../flink/runtime/executiongraph/ErrorInfo.java    |  24 +++-
 .../flink/runtime/executiongraph/Execution.java    |  16 +--
 .../runtime/executiongraph/ExecutionVertex.java    |   9 +-
 .../failover/flip1/ExecutionFailureHandler.java    |  20 ++-
 .../failover/flip1/FailureHandlingResult.java      |  98 ++++++++++---
 .../rest/handler/job/JobExceptionsHandler.java     |   8 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  |  32 ++---
 .../flink/runtime/scheduler/SchedulerBase.java     |  47 +++++++
 .../org/apache/flink/runtime/taskmanager/Task.java |   4 +-
 .../executiongraph/ArchivedExecutionGraphTest.java |  31 ++++-
 .../executiongraph/ExecutionVertexCancelTest.java  |  11 +-
 .../ExecutionVertexDeploymentTest.java             |  31 +++--
 .../flip1/ExecutionFailureHandlerTest.java         |  12 +-
 .../failover/flip1/FailureHandlingResultTest.java  |  24 ++--
 .../rest/handler/job/JobExceptionsHandlerTest.java |   5 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    | 153 ++++++++++++++++++---
 20 files changed, 434 insertions(+), 139 deletions(-)


[flink] 01/04: [hotfix][runtime] Fixes ArchivedExecutionGraphTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a502892ac2aaace62d2a65f5bff36c04b878e816
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Feb 3 21:17:34 2021 +0100

    [hotfix][runtime] Fixes ArchivedExecutionGraphTest
    
    The previously used globalFailure only set the failureInfo on a ExecutionGraph-
    level. I switched to updateTaskExecutionState which will also set the
    failureInfo on an Execution-level
---
 .../runtime/executiongraph/ArchivedExecutionGraphTest.java | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index a96a147..60a0aec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -115,8 +116,17 @@ public class ArchivedExecutionGraphTest extends TestLogger {
         runtimeGraph = scheduler.getExecutionGraph();
 
         scheduler.startScheduling();
-        scheduler.handleGlobalFailure(
-                new RuntimeException("This exception was thrown on purpose."));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        runtimeGraph
+                                .getAllExecutionVertices()
+                                .iterator()
+                                .next()
+                                .getCurrentExecutionAttempt()
+                                .getAttemptId(),
+                        ExecutionState.FAILED,
+                        new RuntimeException("Local failure")));
     }
 
     @Test


[flink] 02/04: [hotfix][runtime] Fixes checkstyle errors

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae2d06b56c14a10fce305f2083769961dda59692
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Jan 20 11:36:33 2021 +0100

    [hotfix][runtime] Fixes checkstyle errors
---
 .../org/apache/flink/runtime/executiongraph/AccessExecution.java     | 5 ++---
 .../apache/flink/runtime/executiongraph/AccessExecutionVertex.java   | 1 +
 .../org/apache/flink/runtime/executiongraph/ArchivedExecution.java   | 2 ++
 .../apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java | 4 +++-
 4 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
index ef1c3b8..5d31700 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
@@ -15,15 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
-/**
- * Common interface for the runtime {@link Execution and {@link ArchivedExecution}.
- */
+/** Common interface for the runtime {@link Execution} and {@link ArchivedExecution}. */
 public interface AccessExecution {
     /**
      * Returns the {@link ExecutionAttemptID} for this Execution.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
index 4833f54..ac230bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.execution.ExecutionState;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index 28ffbd5..673640e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
@@ -25,6 +26,7 @@ import org.apache.flink.util.ExceptionUtils;
 
 import java.io.Serializable;
 
+/** {@code ArchivedExecution} is a readonly representation of {@link Execution}. */
 public class ArchivedExecution implements AccessExecution, Serializable {
     private static final long serialVersionUID = 4817108757483345173L;
     // --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index cdc08c4..80a0faf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -25,6 +26,7 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 
+/** {@code ArchivedExecutionVertex} is a readonly representation of {@link ExecutionVertex}. */
 public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
 
     private static final long serialVersionUID = -6708241535015028576L;
@@ -33,7 +35,7 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
 
     private final EvictingBoundedList<ArchivedExecution> priorExecutions;
 
-    /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
+    /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations. */
     private final String taskNameWithSubtask;
 
     private final ArchivedExecution currentExecution; // this field must never be null


[flink] 04/04: [FLINK-21187][runtime] Adds exception history

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c77a686c195d1742c276f4a9e75899c8b85377bb
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Mon Feb 15 13:35:29 2021 +0100

    [FLINK-21187][runtime] Adds exception history
    
    The initial implementation only collects the actual exception without
    considering any other failures that might happen concurrently.
    
    This closes #14798.
---
 .../runtime/executiongraph/AccessExecution.java    |   8 +-
 .../executiongraph/AccessExecutionVertex.java      |   7 +-
 .../runtime/executiongraph/ArchivedExecution.java  |  16 ++-
 .../executiongraph/ArchivedExecutionVertex.java    |   5 +-
 .../flink/runtime/executiongraph/ErrorInfo.java    |  24 +++-
 .../flink/runtime/executiongraph/Execution.java    |  16 +--
 .../runtime/executiongraph/ExecutionVertex.java    |   9 +-
 .../failover/flip1/ExecutionFailureHandler.java    |  20 ++-
 .../failover/flip1/FailureHandlingResult.java      |  98 ++++++++++---
 .../rest/handler/job/JobExceptionsHandler.java     |   8 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  |  32 ++---
 .../flink/runtime/scheduler/SchedulerBase.java     |  47 +++++++
 .../executiongraph/ArchivedExecutionGraphTest.java |  17 ++-
 .../executiongraph/ExecutionVertexCancelTest.java  |  11 +-
 .../ExecutionVertexDeploymentTest.java             |  31 +++--
 .../flip1/ExecutionFailureHandlerTest.java         |  12 +-
 .../failover/flip1/FailureHandlingResultTest.java  |  24 ++--
 .../rest/handler/job/JobExceptionsHandlerTest.java |   5 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    | 153 ++++++++++++++++++---
 19 files changed, 412 insertions(+), 131 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
index 5d31700..b0da913 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
@@ -22,6 +22,8 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.Optional;
+
 /** Common interface for the runtime {@link Execution} and {@link ArchivedExecution}. */
 public interface AccessExecution {
     /**
@@ -63,9 +65,11 @@ public interface AccessExecution {
      * Returns the exception that caused the job to fail. This is the first root exception that was
      * not recoverable and triggered job failure.
      *
-     * @return failure exception as a string, or {@code "(null)"}
+     * @return an {@code Optional} of {@link ErrorInfo} containing the {@code Throwable} and the
+     *     time it was registered if an error occurred. If no error occurred an empty {@code
+     *     Optional} will be returned.
      */
-    String getFailureCauseAsString();
+    Optional<ErrorInfo> getFailureInfo();
 
     /**
      * Returns the timestamp for the given {@link ExecutionState}.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
index ac230bb..72d4f61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import javax.annotation.Nullable;
 
+import java.util.Optional;
+
 /** Common interface for the runtime {@link ExecutionVertex} and {@link ArchivedExecutionVertex}. */
 public interface AccessExecutionVertex {
     /**
@@ -65,9 +67,10 @@ public interface AccessExecutionVertex {
      * Returns the exception that caused the job to fail. This is the first root exception that was
      * not recoverable and triggered job failure.
      *
-     * @return failure exception as a string, or {@code "(null)"}
+     * @return failure exception wrapped in an {@code Optional} of {@link ErrorInfo}, or an empty
+     *     {@link Optional} if no exception was caught.
      */
-    String getFailureCauseAsString();
+    Optional<ErrorInfo> getFailureInfo();
 
     /**
      * Returns the {@link TaskManagerLocation} for this execution vertex.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index 673640e..16384a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -22,9 +22,11 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.Optional;
 
 /** {@code ArchivedExecution} is a readonly representation of {@link Execution}. */
 public class ArchivedExecution implements AccessExecution, Serializable {
@@ -39,7 +41,7 @@ public class ArchivedExecution implements AccessExecution, Serializable {
 
     private final ExecutionState state;
 
-    private final String failureCause; // once assigned, never changes
+    @Nullable private final ErrorInfo failureInfo; // once assigned, never changes
 
     private final TaskManagerLocation assignedResourceLocation; // for the archived execution
 
@@ -59,7 +61,7 @@ public class ArchivedExecution implements AccessExecution, Serializable {
                 execution.getAttemptId(),
                 execution.getAttemptNumber(),
                 execution.getState(),
-                ExceptionUtils.stringifyException(execution.getFailureCause()),
+                execution.getFailureInfo().orElse(null),
                 execution.getAssignedResourceLocation(),
                 execution.getAssignedAllocationID(),
                 execution.getVertex().getParallelSubtaskIndex(),
@@ -72,14 +74,14 @@ public class ArchivedExecution implements AccessExecution, Serializable {
             ExecutionAttemptID attemptId,
             int attemptNumber,
             ExecutionState state,
-            String failureCause,
+            @Nullable ErrorInfo failureCause,
             TaskManagerLocation assignedResourceLocation,
             AllocationID assignedAllocationID,
             int parallelSubtaskIndex,
             long[] stateTimestamps) {
         this.userAccumulators = userAccumulators;
         this.ioMetrics = ioMetrics;
-        this.failureCause = failureCause;
+        this.failureInfo = failureCause;
         this.assignedResourceLocation = assignedResourceLocation;
         this.attemptNumber = attemptNumber;
         this.attemptId = attemptId;
@@ -123,8 +125,8 @@ public class ArchivedExecution implements AccessExecution, Serializable {
     }
 
     @Override
-    public String getFailureCauseAsString() {
-        return failureCause;
+    public Optional<ErrorInfo> getFailureInfo() {
+        return Optional.ofNullable(failureInfo);
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index 80a0faf..2a53e17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.util.EvictingBoundedList;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.Optional;
 
 /** {@code ArchivedExecutionVertex} is a readonly representation of {@link ExecutionVertex}. */
 public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
@@ -90,8 +91,8 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa
     }
 
     @Override
-    public String getFailureCauseAsString() {
-        return currentExecution.getFailureCauseAsString();
+    public Optional<ErrorInfo> getFailureInfo() {
+        return currentExecution.getFailureInfo();
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
index 0b2f3dd..cfbfc6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 
 /** Simple container to hold an exception and the corresponding timestamp. */
@@ -35,7 +39,25 @@ public class ErrorInfo implements Serializable {
 
     private final long timestamp;
 
-    public ErrorInfo(Throwable exception, long timestamp) {
+    /**
+     * Instantiates an {@code ErrorInfo} to cover inconsistent behavior due to FLINK-21376.
+     *
+     * @param exception The error cause that might be {@code null}.
+     * @param timestamp The timestamp the error was noticed.
+     * @return a {@code ErrorInfo} containing a generic {@link FlinkException} in case of a missing
+     *     error cause.
+     */
+    public static ErrorInfo createErrorInfoWithNullableCause(
+            @Nullable Throwable exception, long timestamp) {
+        return new ErrorInfo(
+                exception != null
+                        ? exception
+                        : new FlinkException(
+                                "Unknown cause for Execution failure (this might be caused by FLINK-21376)."),
+                timestamp);
+    }
+
+    public ErrorInfo(@Nonnull Throwable exception, long timestamp) {
         Preconditions.checkNotNull(exception);
         Preconditions.checkArgument(timestamp > 0);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 4e36c99..8b159de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -55,7 +55,6 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OptionalFailure;
@@ -153,7 +152,8 @@ public class Execution
 
     private LogicalSlot assignedResource;
 
-    private Throwable failureCause; // once assigned, never changes
+    private Optional<ErrorInfo> failureCause =
+            Optional.empty(); // once an ErrorInfo is set, never changes
 
     /**
      * Information to restore the task on recovery, such as checkpoint id and task state snapshot.
@@ -317,13 +317,9 @@ public class Execution
                 : null;
     }
 
-    public Throwable getFailureCause() {
-        return failureCause;
-    }
-
     @Override
-    public String getFailureCauseAsString() {
-        return ExceptionUtils.stringifyException(getFailureCause());
+    public Optional<ErrorInfo> getFailureInfo() {
+        return failureCause;
     }
 
     @Override
@@ -1133,7 +1129,9 @@ public class Execution
         checkState(transitionState(current, FAILED, t));
 
         // success (in a manner of speaking)
-        this.failureCause = t;
+        this.failureCause =
+                Optional.of(
+                        ErrorInfo.createErrorInfoWithNullableCause(t, getStateTimestamp(FAILED)));
 
         updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a2516ff..1bc0fda 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
-import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 
@@ -246,12 +245,8 @@ public class ExecutionVertex
     }
 
     @Override
-    public String getFailureCauseAsString() {
-        return ExceptionUtils.stringifyException(getFailureCause());
-    }
-
-    public Throwable getFailureCause() {
-        return currentExecution.getFailureCause();
+    public Optional<ErrorInfo> getFailureInfo() {
+        return currentExecution.getFailureInfo();
     }
 
     public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
index 6961b45..a3b99e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.throwable.ThrowableClassifier;
 import org.apache.flink.runtime.throwable.ThrowableType;
 import org.apache.flink.util.IterableUtils;
 
+import javax.annotation.Nullable;
+
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -77,7 +79,10 @@ public class ExecutionFailureHandler {
     public FailureHandlingResult getFailureHandlingResult(
             ExecutionVertexID failedTask, Throwable cause) {
         return handleFailure(
-                cause, failoverStrategy.getTasksNeedingRestart(failedTask, cause), false);
+                failedTask,
+                cause,
+                failoverStrategy.getTasksNeedingRestart(failedTask, cause),
+                false);
     }
 
     /**
@@ -90,6 +95,7 @@ public class ExecutionFailureHandler {
      */
     public FailureHandlingResult getGlobalFailureHandlingResult(final Throwable cause) {
         return handleFailure(
+                null,
                 cause,
                 IterableUtils.toStream(schedulingTopology.getVertices())
                         .map(SchedulingExecutionVertex::getId)
@@ -98,13 +104,16 @@ public class ExecutionFailureHandler {
     }
 
     private FailureHandlingResult handleFailure(
+            @Nullable final ExecutionVertexID failingExecutionVertexId,
             final Throwable cause,
             final Set<ExecutionVertexID> verticesToRestart,
             final boolean globalFailure) {
 
         if (isUnrecoverableError(cause)) {
             return FailureHandlingResult.unrecoverable(
-                    new JobException("The failure is not recoverable", cause), globalFailure);
+                    failingExecutionVertexId,
+                    new JobException("The failure is not recoverable", cause),
+                    globalFailure);
         }
 
         restartBackoffTimeStrategy.notifyFailure(cause);
@@ -112,9 +121,14 @@ public class ExecutionFailureHandler {
             numberOfRestarts++;
 
             return FailureHandlingResult.restartable(
-                    verticesToRestart, restartBackoffTimeStrategy.getBackoffTime(), globalFailure);
+                    failingExecutionVertexId,
+                    cause,
+                    verticesToRestart,
+                    restartBackoffTimeStrategy.getBackoffTime(),
+                    globalFailure);
         } else {
             return FailureHandlingResult.unrecoverable(
+                    failingExecutionVertexId,
                     new JobException(
                             "Recovery is suppressed by " + restartBackoffTimeStrategy, cause),
                     globalFailure);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
index 9212710..5782e26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
@@ -17,57 +17,88 @@
 
 package org.apache.flink.runtime.executiongraph.failover.flip1;
 
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Result containing the tasks to restart upon a task failure. Also contains the reason if the
- * failure is not recoverable(non-recoverable failure type or restarting suppressed by restart
- * strategy).
+ * Result containing the tasks to restart upon a task failure. Also contains the reason of the
+ * failure and the vertices to restart if the failure is recoverable (in contrast to non-recoverable
+ * failure type or restarting suppressed by restart strategy).
  */
 public class FailureHandlingResult {
 
-    /** Task vertices to restart to recover from the failure. */
+    /**
+     * Task vertices to restart to recover from the failure or {@code null} if the failure is not
+     * recoverable.
+     */
     private final Set<ExecutionVertexID> verticesToRestart;
 
     /** Delay before the restarting can be conducted. */
     private final long restartDelayMS;
 
-    /** Reason why the failure is not recoverable. */
-    private final Throwable error;
+    /**
+     * The {@link ExecutionVertexID} refering to the {@link ExecutionVertex} the failure is
+     * originating from or {@code null} if it's a global failure.
+     */
+    @Nullable private final ExecutionVertexID failingExecutionVertexId;
+
+    /** Failure reason. {@code @Nullable} because of FLINK-21376. */
+    @Nullable private final Throwable error;
 
-    /** True if the original failure was a global failure. * */
+    /** True if the original failure was a global failure. */
     private final boolean globalFailure;
 
     /**
      * Creates a result of a set of tasks to restart to recover from the failure.
      *
-     * @param verticesToRestart containing task vertices to restart to recover from the failure
+     * @param failingExecutionVertexId the {@link ExecutionVertexID} referring to the {@link
+     *     ExecutionVertex} the failure is originating from. Passing {@code null} as a value
+     *     indicates that the failure was issued by Flink itself.
+     * @param cause the exception that caused this failure.
+     * @param verticesToRestart containing task vertices to restart to recover from the failure.
+     *     {@code null} indicates that the failure is not restartable.
      * @param restartDelayMS indicate a delay before conducting the restart
      */
     private FailureHandlingResult(
-            Set<ExecutionVertexID> verticesToRestart, long restartDelayMS, boolean globalFailure) {
+            @Nullable ExecutionVertexID failingExecutionVertexId,
+            @Nullable Throwable cause,
+            @Nullable Set<ExecutionVertexID> verticesToRestart,
+            long restartDelayMS,
+            boolean globalFailure) {
         checkState(restartDelayMS >= 0);
 
         this.verticesToRestart = Collections.unmodifiableSet(checkNotNull(verticesToRestart));
         this.restartDelayMS = restartDelayMS;
-        this.error = null;
+        this.failingExecutionVertexId = failingExecutionVertexId;
+        this.error = cause;
         this.globalFailure = globalFailure;
     }
 
     /**
      * Creates a result that the failure is not recoverable and no restarting should be conducted.
      *
+     * @param failingExecutionVertexId the {@link ExecutionVertexID} referring to the {@link
+     *     ExecutionVertex} the failure is originating from. Passing {@code null} as a value
+     *     indicates that the failure was issued by Flink itself.
      * @param error reason why the failure is not recoverable
      */
-    private FailureHandlingResult(Throwable error, boolean globalFailure) {
+    private FailureHandlingResult(
+            @Nullable ExecutionVertexID failingExecutionVertexId,
+            @Nonnull Throwable error,
+            boolean globalFailure) {
         this.verticesToRestart = null;
         this.restartDelayMS = -1;
+        this.failingExecutionVertexId = failingExecutionVertexId;
         this.error = checkNotNull(error);
         this.globalFailure = globalFailure;
     }
@@ -101,16 +132,24 @@ public class FailureHandlingResult {
     }
 
     /**
+     * Returns an {@code Optional} with the {@link ExecutionVertexID} of the task causing this
+     * failure or an empty {@code Optional} if it's a global failure.
+     *
+     * @return The {@code ExecutionVertexID} of the causing task or an empty {@code Optional} if
+     *     it's a global failure.
+     */
+    public Optional<ExecutionVertexID> getExecutionVertexIdOfFailedTask() {
+        return Optional.ofNullable(failingExecutionVertexId);
+    }
+
+    /**
      * Returns reason why the restarting cannot be conducted.
      *
      * @return reason why the restarting cannot be conducted
      */
+    @Nullable
     public Throwable getError() {
-        if (canRestart()) {
-            throw new IllegalStateException("Cannot get error when the restarting is accepted.");
-        } else {
-            return error;
-        }
+        return error;
     }
 
     /**
@@ -119,7 +158,7 @@ public class FailureHandlingResult {
      * @return whether the restarting can be conducted
      */
     public boolean canRestart() {
-        return error == null;
+        return verticesToRestart != null;
     }
 
     /**
@@ -136,13 +175,22 @@ public class FailureHandlingResult {
      * <p>The result can be flagged to be from a global failure triggered by the scheduler, rather
      * than from the failure of an individual task.
      *
-     * @param verticesToRestart containing task vertices to restart to recover from the failure
+     * @param failingExecutionVertexId the {@link ExecutionVertexID} refering to the {@link
+     *     ExecutionVertex} the failure is originating from. Passing {@code null} as a value
+     *     indicates that the failure was issued by Flink itself.
+     * @param verticesToRestart containing task vertices to restart to recover from the failure.
+     *     {@code null} indicates that the failure is not restartable.
      * @param restartDelayMS indicate a delay before conducting the restart
      * @return result of a set of tasks to restart to recover from the failure
      */
     public static FailureHandlingResult restartable(
-            Set<ExecutionVertexID> verticesToRestart, long restartDelayMS, boolean globalFailure) {
-        return new FailureHandlingResult(verticesToRestart, restartDelayMS, globalFailure);
+            @Nullable ExecutionVertexID failingExecutionVertexId,
+            @Nonnull Throwable cause,
+            @Nullable Set<ExecutionVertexID> verticesToRestart,
+            long restartDelayMS,
+            boolean globalFailure) {
+        return new FailureHandlingResult(
+                failingExecutionVertexId, cause, verticesToRestart, restartDelayMS, globalFailure);
     }
 
     /**
@@ -151,10 +199,16 @@ public class FailureHandlingResult {
      * <p>The result can be flagged to be from a global failure triggered by the scheduler, rather
      * than from the failure of an individual task.
      *
+     * @param failingExecutionVertexId the {@link ExecutionVertexID} refering to the {@link
+     *     ExecutionVertex} the failure is originating from. Passing {@code null} as a value
+     *     indicates that the failure was issued by Flink itself.
      * @param error reason why the failure is not recoverable
      * @return result indicating the failure is not recoverable
      */
-    public static FailureHandlingResult unrecoverable(Throwable error, boolean globalFailure) {
-        return new FailureHandlingResult(error, globalFailure);
+    public static FailureHandlingResult unrecoverable(
+            @Nullable ExecutionVertexID failingExecutionVertexId,
+            @Nonnull Throwable error,
+            boolean globalFailure) {
+        return new FailureHandlingResult(failingExecutionVertexId, error, globalFailure);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index ca601a6..819f727 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -45,6 +44,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Executor;
 
 /** Handler serving the job exceptions. */
@@ -109,8 +109,8 @@ public class JobExceptionsHandler
         List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new ArrayList<>();
         boolean truncated = false;
         for (AccessExecutionVertex task : executionGraph.getAllExecutionVertices()) {
-            String t = task.getFailureCauseAsString();
-            if (t != null && !t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+            Optional<ErrorInfo> failure = task.getFailureInfo();
+            if (failure.isPresent()) {
                 if (taskExceptionList.size() >= exceptionToReportMaxSize) {
                     truncated = true;
                     break;
@@ -124,7 +124,7 @@ public class JobExceptionsHandler
                 long timestamp = task.getStateTimestamp(ExecutionState.FAILED);
                 taskExceptionList.add(
                         new JobExceptionsInfo.ExecutionExceptionInfo(
-                                t,
+                                failure.get().getExceptionAsString(),
                                 task.getTaskNameWithSubtaskIndex(),
                                 locationString,
                                 timestamp == 0 ? -1 : timestamp));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index edd1da4..394d1cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -268,7 +268,10 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
                 () ->
                         FutureUtils.assertNoException(
                                 cancelFuture.thenRunAsync(
-                                        restartTasks(executionVertexVersions, globalRecovery),
+                                        () -> {
+                                            archiveFromFailureHandlingResult(failureHandlingResult);
+                                            restartTasks(executionVertexVersions, globalRecovery);
+                                        },
                                         getMainThreadExecutor())),
                 failureHandlingResult.getRestartDelayMS(),
                 TimeUnit.MILLISECONDS);
@@ -286,27 +289,24 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
         }
     }
 
-    private Runnable restartTasks(
+    private void restartTasks(
             final Set<ExecutionVertexVersion> executionVertexVersions,
             final boolean isGlobalRecovery) {
-        return () -> {
-            final Set<ExecutionVertexID> verticesToRestart =
-                    executionVertexVersioner.getUnmodifiedExecutionVertices(
-                            executionVertexVersions);
+        final Set<ExecutionVertexID> verticesToRestart =
+                executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
 
-            removeVerticesFromRestartPending(verticesToRestart);
+        removeVerticesFromRestartPending(verticesToRestart);
 
-            resetForNewExecutions(verticesToRestart);
+        resetForNewExecutions(verticesToRestart);
 
-            try {
-                restoreState(verticesToRestart, isGlobalRecovery);
-            } catch (Throwable t) {
-                handleGlobalFailure(t);
-                return;
-            }
+        try {
+            restoreState(verticesToRestart, isGlobalRecovery);
+        } catch (Throwable t) {
+            handleGlobalFailure(t);
+            return;
+        }
 
-            schedulingStrategy.restartTasks(verticesToRestart);
-        };
+        schedulingStrategy.restartTasks(verticesToRestart);
     }
 
     private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> verticesToRestart) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index f5e03ec..64070ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
@@ -54,6 +55,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
 import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -96,6 +98,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -158,6 +161,8 @@ public abstract class SchedulerBase implements SchedulerNG {
 
     private final ComponentMainThreadExecutor mainThreadExecutor;
 
+    private final List<ErrorInfo> taskFailureHistory = new ArrayList<>();
+
     public SchedulerBase(
             final Logger log,
             final JobGraph jobGraph,
@@ -516,6 +521,7 @@ public abstract class SchedulerBase implements SchedulerNG {
     protected void failJob(Throwable cause) {
         incrementVersionsOfAllVertices();
         executionGraph.failJob(cause);
+        getTerminationFuture().thenRun(() -> archiveGlobalFailure(cause));
     }
 
     protected final SchedulingTopology getSchedulingTopology() {
@@ -635,6 +641,42 @@ public abstract class SchedulerBase implements SchedulerNG {
         return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
     }
 
+    protected final void archiveGlobalFailure(@Nullable Throwable failure) {
+        archiveGlobalFailure(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED));
+    }
+
+    protected final void archiveGlobalFailure(@Nullable Throwable failure, long timestamp) {
+        taskFailureHistory.add(ErrorInfo.createErrorInfoWithNullableCause(failure, timestamp));
+        log.debug("Archive global failure.", failure);
+    }
+
+    protected final void archiveFromFailureHandlingResult(
+            FailureHandlingResult failureHandlingResult) {
+        final Optional<Execution> executionOptional =
+                failureHandlingResult
+                        .getExecutionVertexIdOfFailedTask()
+                        .map(this::getExecutionVertex)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt);
+
+        if (executionOptional.isPresent()) {
+            final Execution failedExecution = executionOptional.get();
+            failedExecution
+                    .getFailureInfo()
+                    .ifPresent(
+                            failureInfo -> {
+                                taskFailureHistory.add(failureInfo);
+                                log.debug(
+                                        "Archive local failure causing attempt {} to fail: {}",
+                                        failedExecution.getAttemptId(),
+                                        failureInfo.getExceptionAsString());
+                            });
+        } else {
+            // fallback in case of a global fail over - no failed state is set and, therefore, no
+            // timestamp was taken
+            archiveGlobalFailure(failureHandlingResult.getError(), System.currentTimeMillis());
+        }
+    }
+
     @Override
     public final boolean updateTaskExecutionState(
             final TaskExecutionStateTransition taskExecutionState) {
@@ -715,6 +757,11 @@ public abstract class SchedulerBase implements SchedulerNG {
     protected void notifyPartitionDataAvailableInternal(
             IntermediateResultPartitionID resultPartitionId) {}
 
+    @VisibleForTesting
+    protected List<ErrorInfo> getExceptionHistory() {
+        return taskFailureHistory;
+    }
+
     @Override
     public ArchivedExecutionGraph requestJob() {
         mainThreadExecutor.assertRunningInMainThread();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 60a0aec..52ab3e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -363,8 +363,12 @@ public class ArchivedExecutionGraphTest extends TestLogger {
         assertEquals(
                 runtimeVertex.getStateTimestamp(ExecutionState.FAILED),
                 archivedVertex.getStateTimestamp(ExecutionState.FAILED));
-        assertEquals(
-                runtimeVertex.getFailureCauseAsString(), archivedVertex.getFailureCauseAsString());
+        assertThat(
+                runtimeVertex.getFailureInfo().map(ErrorInfo::getExceptionAsString),
+                is(archivedVertex.getFailureInfo().map(ErrorInfo::getExceptionAsString)));
+        assertThat(
+                runtimeVertex.getFailureInfo().map(ErrorInfo::getTimestamp),
+                is(archivedVertex.getFailureInfo().map(ErrorInfo::getTimestamp)));
         assertEquals(
                 runtimeVertex.getCurrentAssignedResourceLocation(),
                 archivedVertex.getCurrentAssignedResourceLocation());
@@ -384,9 +388,12 @@ public class ArchivedExecutionGraphTest extends TestLogger {
         assertEquals(
                 runtimeExecution.getAssignedResourceLocation(),
                 archivedExecution.getAssignedResourceLocation());
-        assertEquals(
-                runtimeExecution.getFailureCauseAsString(),
-                archivedExecution.getFailureCauseAsString());
+        assertThat(
+                runtimeExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString),
+                is(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString)));
+        assertThat(
+                runtimeExecution.getFailureInfo().map(ErrorInfo::getTimestamp),
+                is(archivedExecution.getFailureInfo().map(ErrorInfo::getTimestamp)));
         assertEquals(
                 runtimeExecution.getStateTimestamp(ExecutionState.CREATED),
                 archivedExecution.getStateTimestamp(ExecutionState.CREATED));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index eb2a497..102000a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -43,7 +43,6 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.se
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -65,7 +64,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 
             assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 
-            assertNull(vertex.getFailureCause());
+            assertFalse(vertex.getFailureInfo().isPresent());
 
             assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
             assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
@@ -88,7 +87,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 
             assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 
-            assertNull(vertex.getFailureCause());
+            assertFalse(vertex.getFailureInfo().isPresent());
 
             assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
             assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
@@ -123,7 +122,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 
             assertFalse(slot.isAlive());
 
-            assertNull(vertex.getFailureCause());
+            assertFalse(vertex.getFailureInfo().isPresent());
 
             assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
             assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
@@ -165,7 +164,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 
             assertFalse(slot.isAlive());
 
-            assertNull(vertex.getFailureCause());
+            assertFalse(vertex.getFailureInfo().isPresent());
 
             assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
             assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
@@ -197,7 +196,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 
             assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
 
-            assertNull(vertex.getFailureCause());
+            assertFalse(vertex.getFailureInfo().isPresent());
 
             assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
             assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 088ff45..9f60818 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -45,9 +45,11 @@ import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.core.StringContains.containsString;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -76,7 +78,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
                 // as expected
             }
 
-            assertNull(vertex.getFailureCause());
+            assertFalse(vertex.getFailureInfo().isPresent());
 
             assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
             assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
@@ -107,7 +109,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
                 // as expected
             }
 
-            assertNull(vertex.getFailureCause());
+            assertFalse(vertex.getFailureInfo().isPresent());
 
             assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
             assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
@@ -172,8 +174,10 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
             vertex.deployToSlot(slot);
 
             assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
-            assertNotNull(vertex.getFailureCause());
-            assertTrue(vertex.getFailureCause().getMessage().contains(ERROR_MESSAGE));
+            assertTrue(vertex.getFailureInfo().isPresent());
+            assertThat(
+                    vertex.getFailureInfo().map(ErrorInfo::getExceptionAsString).get(),
+                    containsString(ERROR_MESSAGE));
 
             assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
             assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
@@ -202,7 +206,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
             // wait until the state transition must be done
             for (int i = 0; i < 100; i++) {
                 if (vertex.getExecutionState() == ExecutionState.FAILED
-                        && vertex.getFailureCause() != null) {
+                        && vertex.getFailureInfo().isPresent()) {
                     break;
                 } else {
                     Thread.sleep(10);
@@ -210,8 +214,10 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
             }
 
             assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
-            assertNotNull(vertex.getFailureCause());
-            assertTrue(vertex.getFailureCause().getMessage().contains(ERROR_MESSAGE));
+            assertTrue(vertex.getFailureInfo().isPresent());
+            assertThat(
+                    vertex.getFailureInfo().map(ErrorInfo::getExceptionAsString).get(),
+                    containsString(ERROR_MESSAGE));
 
             assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
             assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
@@ -241,7 +247,12 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
             vertex.fail(testError);
 
             assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
-            assertEquals(testError, vertex.getFailureCause());
+            assertThat(
+                    vertex.getFailureInfo()
+                            .map(ErrorInfo::getException)
+                            .get()
+                            .deserializeError(ClassLoader.getSystemClassLoader()),
+                    is(testError));
 
             assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
             assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
index f2bf546..477e452 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
 import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.core.IsSame;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -37,6 +38,7 @@ import java.util.stream.Collectors;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -73,21 +75,17 @@ public class ExecutionFailureHandlerTest extends TestLogger {
                 Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0));
         failoverStrategy.setTasksToRestart(tasksToRestart);
 
+        Exception cause = new Exception("test failure");
         // trigger a task failure
         final FailureHandlingResult result =
                 executionFailureHandler.getFailureHandlingResult(
-                        new ExecutionVertexID(new JobVertexID(), 0), new Exception("test failure"));
+                        new ExecutionVertexID(new JobVertexID(), 0), cause);
 
         // verify results
         assertTrue(result.canRestart());
         assertEquals(RESTART_DELAY_MS, result.getRestartDelayMS());
         assertEquals(tasksToRestart, result.getVerticesToRestart());
-        try {
-            result.getError();
-            fail("Cannot get error when the restarting is accepted");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
+        assertThat(result.getError(), IsSame.sameInstance(cause));
         assertEquals(1, executionFailureHandler.getNumberOfRestarts());
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
index 108e071..3ceba5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
@@ -27,8 +27,11 @@ import org.junit.Test;
 import java.util.HashSet;
 import java.util.Set;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.core.IsSame.sameInstance;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -39,31 +42,32 @@ public class FailureHandlingResultTest extends TestLogger {
     @Test
     public void testNormalFailureHandlingResult() {
         // create a normal FailureHandlingResult
+        ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
         Set<ExecutionVertexID> tasks = new HashSet<>();
-        tasks.add(new ExecutionVertexID(new JobVertexID(), 0));
+        tasks.add(executionVertexID);
         long delay = 1234;
-        FailureHandlingResult result = FailureHandlingResult.restartable(tasks, delay, false);
+        Throwable error = new RuntimeException();
+        FailureHandlingResult result =
+                FailureHandlingResult.restartable(executionVertexID, error, tasks, delay, false);
 
         assertTrue(result.canRestart());
         assertEquals(delay, result.getRestartDelayMS());
         assertEquals(tasks, result.getVerticesToRestart());
-        try {
-            result.getError();
-            fail("Cannot get error when the restarting is accepted");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
+        assertThat(result.getError(), sameInstance(error));
+        assertTrue(result.getExecutionVertexIdOfFailedTask().isPresent());
+        assertThat(result.getExecutionVertexIdOfFailedTask().get(), is(executionVertexID));
     }
 
     /** Tests FailureHandlingResult which suppresses restarts. */
     @Test
-    public void testRestartingSuppressedFailureHandlingResult() {
+    public void testRestartingSuppressedFailureHandlingResultWithNoCausingExecutionVertexId() {
         // create a FailureHandlingResult with error
         Throwable error = new Exception("test error");
-        FailureHandlingResult result = FailureHandlingResult.unrecoverable(error, false);
+        FailureHandlingResult result = FailureHandlingResult.unrecoverable(null, error, false);
 
         assertFalse(result.canRestart());
         assertEquals(error, result.getError());
+        assertFalse(result.getExecutionVertexIdOfFailedTask().isPresent());
         try {
             result.getVerticesToRestart();
             fail("get tasks to restart is not allowed when restarting is suppressed");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index d2145b7..bdbd8ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecution;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -123,7 +124,9 @@ public class JobExceptionsHandlerTest extends TestLogger {
                                     new ExecutionAttemptID(),
                                     attempt,
                                     expectedState,
-                                    "error",
+                                    new ErrorInfo(
+                                            new RuntimeException("error"),
+                                            System.currentTimeMillis()),
                                     assignedResourceLocation,
                                     allocationID,
                                     subtaskIndex,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 86174cd..737c69b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -22,6 +22,8 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -59,6 +61,7 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
 
 import org.junit.After;
 import org.junit.Before;
@@ -84,9 +87,12 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -259,7 +265,7 @@ public class DefaultSchedulerTest extends TestLogger {
                 archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
 
         scheduler.updateTaskExecutionState(
-                new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
+                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
 
         taskRestartExecutor.triggerScheduledTasks();
 
@@ -275,8 +281,7 @@ public class DefaultSchedulerTest extends TestLogger {
         final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
 
         final TaskExecutionState taskExecutionState =
-                new TaskExecutionState(
-                        jobGraph.getJobID(), new ExecutionAttemptID(), ExecutionState.FAILED);
+                createFailedTaskExecutionState(jobGraph.getJobID(), new ExecutionAttemptID());
 
         assertFalse(scheduler.updateTaskExecutionState(taskExecutionState));
     }
@@ -294,7 +299,7 @@ public class DefaultSchedulerTest extends TestLogger {
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
 
         scheduler.updateTaskExecutionState(
-                new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
+                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
 
         taskRestartExecutor.triggerScheduledTasks();
 
@@ -423,7 +428,7 @@ public class DefaultSchedulerTest extends TestLogger {
         final ExecutionAttemptID attemptId =
                 sourceExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
-                new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
+                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
         testRestartBackoffTimeStrategy.setCanRestart(false);
 
         testExecutionSlotAllocator.enableAutoCompletePendingRequests();
@@ -477,7 +482,7 @@ public class DefaultSchedulerTest extends TestLogger {
         final ExecutionAttemptID attemptId =
                 onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
         scheduler.updateTaskExecutionState(
-                new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
+                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
 
         taskRestartExecutor.triggerScheduledTasks();
 
@@ -559,19 +564,15 @@ public class DefaultSchedulerTest extends TestLogger {
         // fail v1 and let it recover to SCHEDULED
         // the initial deployment of v1 will be outdated
         scheduler.updateTaskExecutionState(
-                new TaskExecutionState(
-                        jobGraph.getJobID(),
-                        v1.getCurrentExecutionAttempt().getAttemptId(),
-                        ExecutionState.FAILED));
+                createFailedTaskExecutionState(
+                        jobGraph.getJobID(), v1.getCurrentExecutionAttempt().getAttemptId()));
         taskRestartExecutor.triggerScheduledTasks();
 
         // fail v2 to get all pending slot requests in the initial deployments to be done
         // this triggers the outdated deployment of v1
         scheduler.updateTaskExecutionState(
-                new TaskExecutionState(
-                        jobGraph.getJobID(),
-                        v2.getCurrentExecutionAttempt().getAttemptId(),
-                        ExecutionState.FAILED));
+                createFailedTaskExecutionState(
+                        jobGraph.getJobID(), v2.getCurrentExecutionAttempt().getAttemptId()));
 
         // v1 should not be affected
         assertThat(sv1.getState(), is(equalTo(ExecutionState.SCHEDULED)));
@@ -600,7 +601,7 @@ public class DefaultSchedulerTest extends TestLogger {
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(1)));
 
         scheduler.updateTaskExecutionState(
-                new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
+                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
         taskRestartExecutor.triggerScheduledTasks();
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0)));
     }
@@ -635,7 +636,7 @@ public class DefaultSchedulerTest extends TestLogger {
         acknowledgePendingCheckpoint(scheduler, checkpointId);
 
         scheduler.updateTaskExecutionState(
-                new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
+                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
         taskRestartExecutor.triggerScheduledTasks();
         assertThat(masterHook.getRestoreCount(), is(equalTo(1)));
     }
@@ -672,7 +673,7 @@ public class DefaultSchedulerTest extends TestLogger {
         acknowledgePendingCheckpoint(scheduler, checkpointId);
 
         scheduler.updateTaskExecutionState(
-                new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
+                createFailedTaskExecutionState(jobGraph.getJobID(), attemptId));
         taskRestartExecutor.triggerScheduledTasks();
         final List<ExecutionVertexID> deployedExecutionVertices =
                 testExecutionVertexOperations.getDeployedVertices();
@@ -870,6 +871,124 @@ public class DefaultSchedulerTest extends TestLogger {
         assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0));
     }
 
+    @Test
+    public void testExceptionHistoryWithGlobalFailOver() {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID attemptId =
+                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        final Exception expectedException = new Exception("Expected exception");
+        final long start = System.currentTimeMillis();
+        scheduler.handleGlobalFailure(expectedException);
+
+        // we have to cancel the task and trigger the restart to have the exception history
+        // populated
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        attemptId,
+                        ExecutionState.CANCELED,
+                        expectedException));
+        taskRestartExecutor.triggerScheduledTasks();
+        final long end = System.currentTimeMillis();
+
+        final List<ErrorInfo> actualExceptionHistory = scheduler.getExceptionHistory();
+
+        assertThat(actualExceptionHistory, hasSize(1));
+
+        final ErrorInfo failure = actualExceptionHistory.get(0);
+        assertThat(
+                failure.getException().deserializeError(ClassLoader.getSystemClassLoader()),
+                is(expectedException));
+        assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start));
+        assertThat(failure.getTimestamp(), lessThanOrEqualTo(end));
+    }
+
+    @Test
+    public void testExceptionHistoryWithRestartableFailure() {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+        // initiate restartable failure
+        final ExecutionAttemptID restartableAttemptId =
+                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException restartableException = new RuntimeException("restartable exception");
+        Range<Long> updateStateTriggeringRestartTimeframe =
+                initiateFailure(scheduler, jobId, restartableAttemptId, restartableException);
+
+        taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+        // initiate job failure
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final ExecutionAttemptID failingAttemptId =
+                Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final RuntimeException failingException = new RuntimeException("failing exception");
+        Range<Long> updateStateTriggeringJobFailureTimeframe =
+                initiateFailure(scheduler, jobId, failingAttemptId, failingException);
+
+        List<ErrorInfo> actualExceptionHistory = scheduler.getExceptionHistory();
+        assertThat(actualExceptionHistory.size(), is(2));
+
+        // assert restarted attempt
+        ErrorInfo restartableFailure = actualExceptionHistory.get(0);
+        assertThat(
+                restartableFailure
+                        .getException()
+                        .deserializeError(ClassLoader.getSystemClassLoader()),
+                is(restartableException));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint()));
+        assertThat(
+                restartableFailure.getTimestamp(),
+                lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint()));
+
+        // assert job failure attempt
+        ErrorInfo globalFailure = actualExceptionHistory.get(1);
+        Throwable actualException =
+                globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader());
+        assertThat(actualException, instanceOf(JobException.class));
+        assertThat(actualException, FlinkMatchers.containsCause(failingException));
+        assertThat(
+                globalFailure.getTimestamp(),
+                greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint()));
+        assertThat(
+                globalFailure.getTimestamp(),
+                lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
+    }
+
+    private static TaskExecutionState createFailedTaskExecutionState(
+            JobID jobId, ExecutionAttemptID executionAttemptID) {
+        return new TaskExecutionState(
+                jobId,
+                executionAttemptID,
+                ExecutionState.FAILED,
+                new Exception("Expected failure cause"));
+    }
+
+    private static Range<Long> initiateFailure(
+            DefaultScheduler scheduler,
+            JobID jobId,
+            ExecutionAttemptID executionAttemptID,
+            Throwable exception) {
+        long start = System.currentTimeMillis();
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobId, executionAttemptID, ExecutionState.FAILED, exception));
+        return Range.closed(start, System.currentTimeMillis());
+    }
+
     private static JobVertex createVertex(String name, int parallelism) {
         final JobVertex v = new JobVertex(name);
         v.setParallelism(parallelism);


[flink] 03/04: [hotfix][runtime] Adds failure cause to log message

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7eac5c62a10158ef210906deb161ac791f18d3ae
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Mon Feb 15 11:21:35 2021 +0100

    [hotfix][runtime] Adds failure cause to log message
    
    The log message when transitioning to an exeuctionState having a failure cause
    didn't print the cause itself. This is fixed now.
---
 .../src/main/java/org/apache/flink/runtime/taskmanager/Task.java      | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 4f9176c..a315683 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1039,12 +1039,12 @@ public class Task
                         newState);
             } else {
                 LOG.warn(
-                        "{} ({}) switched from {} to {}.",
+                        "{} ({}) switched from {} to {} with failure cause: {}",
                         taskNameWithSubtask,
                         executionId,
                         currentState,
                         newState,
-                        cause);
+                        ExceptionUtils.stringifyException(cause));
             }
 
             return true;