You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/04 17:10:21 UTC

[3/9] flink git commit: [FLINK-6043] [web] Display exception timestamp

[FLINK-6043] [web] Display exception timestamp

This closes #3583.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f91083b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f91083b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f91083b

Branch: refs/heads/master
Commit: 0f91083b2b497485b4375935ed7397583a03ca7c
Parents: 957cced
Author: zentol <ch...@apache.org>
Authored: Wed Mar 29 14:46:19 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 15:04:16 2017 +0200

----------------------------------------------------------------------
 .../handlers/JobExceptionsHandler.java          | 11 ++-
 .../handlers/JobExceptionsHandlerTest.java      |  5 +-
 .../utils/ArchivedExecutionGraphBuilder.java    |  7 +-
 .../utils/ArchivedJobGenerationUtils.java       |  3 +-
 .../app/partials/jobs/job.exceptions.jade       | 14 +++-
 .../executiongraph/AccessExecutionGraph.java    |  4 +-
 .../executiongraph/ArchivedExecutionGraph.java  |  6 +-
 .../flink/runtime/executiongraph/ErrorInfo.java | 85 ++++++++++++++++++++
 .../runtime/executiongraph/ExecutionGraph.java  | 56 +++++++++----
 .../ArchivedExecutionGraphTest.java             |  2 +-
 .../ExecutionGraphSuspendTest.java              |  2 +-
 .../query/AbstractQueryableStateITCase.java     |  2 +-
 12 files changed, 165 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index 181b270..e31299b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -78,9 +80,10 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 		gen.writeStartObject();
 
 		// most important is the root failure cause
-		String rootException = graph.getFailureCauseAsString();
-		if (rootException != null && !rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
-			gen.writeStringField("root-exception", rootException);
+		ErrorInfo rootException = graph.getFailureCause();
+		if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+			gen.writeStringField("root-exception", rootException.getExceptionAsString());
+			gen.writeNumberField("timestamp", rootException.getTimestamp());
 		}
 
 		// we additionally collect all exceptions (up to a limit) that occurred in the individual tasks
@@ -105,6 +108,8 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 				gen.writeStringField("exception", t);
 				gen.writeStringField("task", task.getTaskNameWithSubtaskIndex());
 				gen.writeStringField("location", locationString);
+				long timestamp = task.getStateTimestamp(ExecutionState.FAILED);
+				gen.writeNumberField("timestamp", timestamp == 0 ? -1 : timestamp);
 				gen.writeEndObject();
 				numExceptionsSoFar++;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
index f3df225..6016d01 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -71,7 +72,8 @@ public class JobExceptionsHandlerTest {
 	private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
 
-		Assert.assertEquals(originalJob.getFailureCauseAsString(), result.get("root-exception").asText());
+		Assert.assertEquals(originalJob.getFailureCause().getExceptionAsString(), result.get("root-exception").asText());
+		Assert.assertEquals(originalJob.getFailureCause().getTimestamp(), result.get("timestamp").asLong());
 
 		ArrayNode exceptions = (ArrayNode) result.get("all-exceptions");
 
@@ -81,6 +83,7 @@ public class JobExceptionsHandlerTest {
 				JsonNode exception = exceptions.get(x);
 
 				Assert.assertEquals(expectedSubtask.getFailureCauseAsString(), exception.get("exception").asText());
+				Assert.assertEquals(expectedSubtask.getStateTimestamp(ExecutionState.FAILED), exception.get("timestamp").asLong());
 				Assert.assertEquals(expectedSubtask.getTaskNameWithSubtaskIndex(), exception.get("task").asText());
 
 				TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation();

http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
index 935663c..57b300a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
@@ -47,7 +48,7 @@ public class ArchivedExecutionGraphBuilder {
 	private List<ArchivedExecutionJobVertex> verticesInCreationOrder;
 	private long[] stateTimestamps;
 	private JobStatus state;
-	private String failureCause;
+	private ErrorInfo failureCause;
 	private String jsonPlan;
 	private StringifiedAccumulatorResult[] archivedUserAccumulators;
 	private ArchivedExecutionConfig archivedExecutionConfig;
@@ -85,7 +86,7 @@ public class ArchivedExecutionGraphBuilder {
 		return this;
 	}
 
-	public ArchivedExecutionGraphBuilder setFailureCause(String failureCause) {
+	public ArchivedExecutionGraphBuilder setFailureCause(ErrorInfo failureCause) {
 		this.failureCause = failureCause;
 		return this;
 	}
@@ -126,7 +127,7 @@ public class ArchivedExecutionGraphBuilder {
 			verticesInCreationOrder != null ? verticesInCreationOrder : new ArrayList<>(tasks.values()),
 			stateTimestamps != null ? stateTimestamps : new long[JobStatus.values().length],
 			state != null ? state : JobStatus.FINISHED,
-			failureCause != null ? failureCause : "(null)",
+			failureCause,
 			jsonPlan != null ? jsonPlan : "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName + "\", \"nodes\":[]}",
 			archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0],
 			serializedUserAccumulators != null ? serializedUserAccumulators : Collections.<String, SerializedValue<Object>>emptyMap(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
index dee04f2..3e4fc01 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecution;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -132,7 +133,7 @@ public class ArchivedJobGenerationUtils {
 		originalJob = new ArchivedExecutionGraphBuilder()
 			.setJobID(new JobID())
 			.setTasks(tasks)
-			.setFailureCause("jobException")
+			.setFailureCause(new ErrorInfo(new Exception("jobException"), originalAttempt.getStateTimestamp(ExecutionState.FAILED)))
 			.setState(JobStatus.FINISHED)
 			.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
 			.setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})

http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime-web/web-dashboard/app/partials/jobs/job.exceptions.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.exceptions.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.exceptions.jade
index 1aa10b0..eee19f5 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.exceptions.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.exceptions.jade
@@ -20,6 +20,12 @@
     .panel-title
       | Root exception
 
+  .panel-heading.clearfix
+    .panel-info.thin.last(ng-if="exceptions['timestamp'] != -1")
+      strong Timestamp:
+      = ' '
+      | {{ exceptions['timestamp'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}
+
   .panel-body
     pre.exception
       | {{ exceptions['root-exception'] }}
@@ -30,8 +36,14 @@
       | {{ exception.task }}
 
   .panel-heading.clearfix
+    .panel-info.thin(ng-if="exception.timestamp != -1")
+      strong Timestamp:
+      = ' '
+      | {{ exception.timestamp | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}
     .panel-info.thin.last
-      span {{ exception.location }}
+      strong Location:
+      = ' '
+      | {{ exception.location }}
 
   .panel-body
     pre.exception

http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 3b064c3..df4ed2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -66,9 +66,9 @@ public interface AccessExecutionGraph {
 	 * Returns the exception that caused the job to fail. This is the first root exception
 	 * that was not recoverable and triggered job failure.
 	 *
-	 * @return failure causing exception as a string, or {@code "(null)"}
+	 * @return failure causing exception, or null
 	 */
-	String getFailureCauseAsString();
+	ErrorInfo getFailureCause();
 
 	/**
 	 * Returns the job vertex for the given {@link JobVertexID}.

http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index b9db1e7..901b973 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -71,7 +71,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 	 * The exception that caused the job to fail. This is set to the first root exception
 	 * that was not recoverable and triggered job failure
 	 */
-	private final String failureCause;
+	private final ErrorInfo failureCause;
 
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private final String jsonPlan;
@@ -93,7 +93,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 			List<ArchivedExecutionJobVertex> verticesInCreationOrder,
 			long[] stateTimestamps,
 			JobStatus state,
-			String failureCause,
+			ErrorInfo failureCause,
 			String jsonPlan,
 			StringifiedAccumulatorResult[] archivedUserAccumulators,
 			Map<String, SerializedValue<Object>> serializedUserAccumulators,
@@ -141,7 +141,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 	}
 
 	@Override
-	public String getFailureCauseAsString() {
+	public ErrorInfo getFailureCause() {
 		return failureCause;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
new file mode 100644
index 0000000..d919bfa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Simple container to hold an exception and the corresponding timestamp.
+ */
+public class ErrorInfo implements Serializable {
+
+	private static final long serialVersionUID = -6138942031953594202L;
+
+	private final transient Throwable exception;
+	private final long timestamp;
+
+	private volatile String exceptionAsString;
+
+	public ErrorInfo(Throwable exception, long timestamp) {
+		Preconditions.checkNotNull(exception);
+		Preconditions.checkArgument(timestamp > 0);
+
+		this.exception = exception;
+		this.timestamp = timestamp;
+	}
+
+	/**
+	 * Returns the contained exception.
+	 *
+	 * @return contained exception, or {@code "(null)"} if either no exception was set or this object has been deserialized
+	 */
+	Throwable getException() {
+		return exception;
+	}
+
+	/**
+	 * Returns the contained exception as a string.
+	 *
+	 * @return failure causing exception as a string, or {@code "(null)"}
+	 */
+	public String getExceptionAsString() {
+		if (exceptionAsString == null) {
+			exceptionAsString = ExceptionUtils.stringifyException(exception);
+		}
+		return exceptionAsString;
+	}
+
+	/**
+	 * Returns the timestamp for the contained exception.
+	 *
+	 * @return timestamp of contained exception, or 0 if no exception was set
+	 */
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		// make sure that the exception was stringified so it isn't lost during serialization
+		if (exceptionAsString == null) {
+			exceptionAsString = ExceptionUtils.stringifyException(exception);
+		}
+		out.defaultWriteObject();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 7c13936..d4f7551 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -261,7 +261,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	/** The exception that caused the job to fail. This is set to the first root exception
 	 * that was not recoverable and triggered job failure */
-	private volatile Throwable failureCause;
+	private volatile ErrorInfo failureCause;
 
 	// ------ Fields that are relevant to the execution and need to be cleared before archiving  -------
 
@@ -593,7 +593,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return state;
 	}
 
-	public Throwable getFailureCause() {
+	public ErrorInfo getFailureCause() {
 		return failureCause;
 	}
 
@@ -610,11 +610,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	@Override
-	public String getFailureCauseAsString() {
-		return ExceptionUtils.stringifyException(failureCause);
-	}
-
-	@Override
 	public ExecutionJobVertex getJobVertex(JobVertexID id) {
 		return this.tasks.get(id);
 	}
@@ -1034,6 +1029,25 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * @param suspensionCause Cause of the suspension
 	 */
 	public void suspend(Throwable suspensionCause) {
+		suspend(new ErrorInfo(suspensionCause, System.currentTimeMillis()));
+	}
+
+	/**
+	 * Suspends the current ExecutionGraph.
+	 *
+	 * The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal
+	 * state. All ExecutionJobVertices will be canceled and the postRunCleanup is executed.
+	 *
+	 * The SUSPENDED state is a local terminal state which stops the execution of the job but does
+	 * not remove the job from the HA job store so that it can be recovered by another JobManager.
+	 *
+	 * @param errorInfo ErrorInfo containing the cause of the suspension
+	 */
+	public void suspend(ErrorInfo errorInfo) {
+		Throwable suspensionCause = errorInfo != null
+			? errorInfo.getException()
+			: null;
+
 		while (true) {
 			JobStatus currentState = state;
 
@@ -1041,7 +1055,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				// stay in a terminal state
 				return;
 			} else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) {
-				this.failureCause = suspensionCause;
+				this.failureCause = errorInfo;
 
 				// make sure no concurrent local actions interfere with the cancellation
 				incrementGlobalModVersion();
@@ -1061,6 +1075,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
+	public void failGlobal(Throwable suspensionCause) {
+		failGlobal(new ErrorInfo(suspensionCause, System.currentTimeMillis()));
+	}
+
 	/**
 	 * Fails the execution graph globally. This failure will not be recovered by a specific
 	 * failover strategy, but results in a full restart of all tasks.
@@ -1070,9 +1088,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * exceptions that indicate a bug or an unexpected call race), and where a full restart is the
 	 * safe way to get consistency back.
 	 * 
-	 * @param t The exception that caused the failure.
+	 * @param errorInfo ErrorInfo containing the exception that caused the failure.
 	 */
-	public void failGlobal(Throwable t) {
+	public void failGlobal(ErrorInfo errorInfo) {
+		Throwable t = errorInfo != null
+			? errorInfo.getException()
+			: null;
+
 		while (true) {
 			JobStatus current = state;
 			// stay in these states
@@ -1082,14 +1104,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				return;
 			}
 			else if (current == JobStatus.RESTARTING) {
-				this.failureCause = t;
+				this.failureCause = errorInfo;
 
 				if (tryRestartOrFail()) {
 					return;
 				}
 			}
-			else if (transitionState(current, JobStatus.FAILING, t)) {
-				this.failureCause = t;
+			else if (transitionState(current, JobStatus.FAILING,t)) {
+				this.failureCause = errorInfo;
 
 				// make sure no concurrent local actions interfere with the cancellation
 				incrementGlobalModVersion();
@@ -1375,6 +1397,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		JobStatus currentState = state;
 
 		if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
+			Throwable failureCause = this.failureCause != null
+				? this.failureCause.getException()
+				: null;
 			synchronized (progressLock) {
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(), getJobID(), failureCause);
@@ -1633,6 +1658,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		// see what this means for us. currently, the first FAILED state means -> FAILED
 		if (newExecutionState == ExecutionState.FAILED) {
 			final Throwable ex = error != null ? error : new FlinkException("Unknown Error (missing cause)");
+			long timestamp = execution.getStateTimestamp(ExecutionState.FAILED);
 
 			// by filtering out late failure calls, we can save some work in
 			// avoiding redundant local failover
@@ -1643,7 +1669,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				catch (Throwable t) {
 					// bug in the failover strategy - fall back to global failover
 					LOG.warn("Error in failover strategy - falling back to global restart", t);
-					failGlobal(ex);
+					failGlobal(new ErrorInfo(ex, timestamp));
 				}
 			}
 		}
@@ -1674,7 +1700,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			archivedVerticesInCreationOrder,
 			stateTimestamps,
 			getState(),
-			getFailureCauseAsString(),
+			failureCause,
 			getJsonPlan(),
 			getAccumulatorResultsStringified(),
 			serializedUserAccumulators,

http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 4e1d0f7..5ffc4ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -167,7 +167,7 @@ public class ArchivedExecutionGraphTest {
 		assertEquals(runtimeGraph.getJobID(), archivedGraph.getJobID());
 		assertEquals(runtimeGraph.getJobName(), archivedGraph.getJobName());
 		assertEquals(runtimeGraph.getState(), archivedGraph.getState());
-		assertEquals(runtimeGraph.getFailureCauseAsString(), archivedGraph.getFailureCauseAsString());
+		assertEquals(runtimeGraph.getFailureCause().getExceptionAsString(), archivedGraph.getFailureCause().getExceptionAsString());
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CREATED), archivedGraph.getStatusTimestamp(JobStatus.CREATED));
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RUNNING), archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FAILING), archivedGraph.getStatusTimestamp(JobStatus.FAILING));

http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index b3af005..b3a8c33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -251,7 +251,7 @@ public class ExecutionGraphSuspendTest {
 
 		assertEquals(JobStatus.SUSPENDED, eg.getState());
 
-		assertEquals(exception, eg.getFailureCause());
+		assertEquals(exception, eg.getFailureCause().getException());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
index 6c8e758..f07113d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
@@ -365,7 +365,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 							.mapTo(ClassTag$.MODULE$.<JobFound>apply(JobFound.class)),
 					deadline.timeLeft());
 
-			String failureCause = jobFound.executionGraph().getFailureCauseAsString();
+			String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
 
 			assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
 			int causedByIndex = failureCause.indexOf("Caused by: ");