You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/24 17:12:38 UTC

[2/3] flink git commit: [FLINK-8466] [runtime] Make sure ErrorInfo references no user-defined classes.

[FLINK-8466] [runtime] Make sure ErrorInfo references no user-defined classes.

That way, holding on to the ErrorInfo does not prevent class unloading.

However, this implies that the ErrorInfo must not hold strong references to any Exception classes.
For that reason, the commit pull the "ground truth" exception into a separate fields, so that the
ExecutionGraph logic itself can always assume to have the proper ground-truth exception.

This closes #5348


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/524c5013
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/524c5013
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/524c5013

Branch: refs/heads/master
Commit: 524c5013a1e877cdaf2f4269863a49851eabda85
Parents: 012413c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 23 22:00:06 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jan 24 17:50:18 2018 +0100

----------------------------------------------------------------------
 .../itcases/AbstractQueryableStateTestBase.java |  2 +-
 .../executiongraph/AccessExecutionGraph.java    |  2 +-
 .../executiongraph/ArchivedExecutionGraph.java  |  2 +-
 .../flink/runtime/executiongraph/ErrorInfo.java | 34 +++--------
 .../runtime/executiongraph/ExecutionGraph.java  | 64 ++++++++------------
 .../rest/handler/job/JobExceptionsHandler.java  |  2 +-
 .../handler/legacy/JobExceptionsHandler.java    |  2 +-
 .../ArchivedExecutionGraphTest.java             |  2 +-
 .../runtime/executiongraph/ErrorInfoTest.java   | 62 +++++++++++++++++++
 .../ExecutionGraphRestartTest.java              |  2 +-
 .../ExecutionGraphSuspendTest.java              |  2 +-
 .../legacy/JobExceptionsHandlerTest.java        |  4 +-
 12 files changed, 107 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/524c5013/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 73ad7fa..623e42b 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -342,7 +342,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger {
 						.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
 				.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-		String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
+		String failureCause = jobFound.executionGraph().getFailureInfo().getExceptionAsString();
 
 		assertEquals(JobStatus.FAILED, jobFound.executionGraph().getState());
 		assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));

http://git-wip-us.apache.org/repos/asf/flink/blob/524c5013/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index c38f818..362afa1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -70,7 +70,7 @@ public interface AccessExecutionGraph {
 	 * @return failure causing exception, or null
 	 */
 	@Nullable
-	ErrorInfo getFailureCause();
+	ErrorInfo getFailureInfo();
 
 	/**
 	 * Returns the job vertex for the given {@link JobVertexID}.

http://git-wip-us.apache.org/repos/asf/flink/blob/524c5013/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 4481e1b..20c2c8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -143,7 +143,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 	}
 
 	@Override
-	public ErrorInfo getFailureCause() {
+	public ErrorInfo getFailureInfo() {
 		return failureCause;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/524c5013/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
index d919bfa..9fe569f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
@@ -18,11 +18,9 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
 
-import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 /**
@@ -32,26 +30,25 @@ public class ErrorInfo implements Serializable {
 
 	private static final long serialVersionUID = -6138942031953594202L;
 
-	private final transient Throwable exception;
-	private final long timestamp;
+	/** The exception that we keep holding forever. Has no strong reference to any user-defined code. */
+	private final SerializedThrowable exception;
 
-	private volatile String exceptionAsString;
+	private final long timestamp;
 
 	public ErrorInfo(Throwable exception, long timestamp) {
 		Preconditions.checkNotNull(exception);
 		Preconditions.checkArgument(timestamp > 0);
 
-		this.exception = exception;
+		this.exception = exception instanceof SerializedThrowable ?
+				(SerializedThrowable) exception : new SerializedThrowable(exception);
 		this.timestamp = timestamp;
 	}
 
 	/**
-	 * Returns the contained exception.
-	 *
-	 * @return contained exception, or {@code "(null)"} if either no exception was set or this object has been deserialized
+	 * Returns the serialized form of the original exception.
 	 */
-	Throwable getException() {
-		return exception;
+	public SerializedThrowable getException() {
+		return this.exception;
 	}
 
 	/**
@@ -60,10 +57,7 @@ public class ErrorInfo implements Serializable {
 	 * @return failure causing exception as a string, or {@code "(null)"}
 	 */
 	public String getExceptionAsString() {
-		if (exceptionAsString == null) {
-			exceptionAsString = ExceptionUtils.stringifyException(exception);
-		}
-		return exceptionAsString;
+		return exception.getFullStringifiedStackTrace();
 	}
 
 	/**
@@ -74,12 +68,4 @@ public class ErrorInfo implements Serializable {
 	public long getTimestamp() {
 		return timestamp;
 	}
-
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		// make sure that the exception was stringified so it isn't lost during serialization
-		if (exceptionAsString == null) {
-			exceptionAsString = ExceptionUtils.stringifyException(exception);
-		}
-		out.defaultWriteObject();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/524c5013/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 5c2f0f6..d187faa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -264,7 +264,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	/** The exception that caused the job to fail. This is set to the first root exception
 	 * that was not recoverable and triggered job failure */
-	private volatile ErrorInfo failureCause;
+	private volatile Throwable failureCause;
+
+	/** The extended failure cause information for the job. This exists in addition to 'failureCause',
+	 * to let 'failureCause' be a strong reference to the exception, while this info holds no
+	 * strong reference to any user-defined classes.*/
+	private volatile ErrorInfo failureInfo;
 
 	// ------ Fields that are relevant to the execution and need to be cleared before archiving  -------
 
@@ -619,10 +624,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return state;
 	}
 
-	public ErrorInfo getFailureCause() {
+	public Throwable getFailureCause() {
 		return failureCause;
 	}
 
+	public ErrorInfo getFailureInfo() {
+		return failureInfo;
+	}
+
 	/**
 	 * Gets the number of full restarts that the execution graph went through.
 	 * If a full restart recovery is currently pending, this recovery is included in the
@@ -1034,25 +1043,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * @param suspensionCause Cause of the suspension
 	 */
 	public void suspend(Throwable suspensionCause) {
-		suspend(new ErrorInfo(suspensionCause, System.currentTimeMillis()));
-	}
-
-	/**
-	 * Suspends the current ExecutionGraph.
-	 *
-	 * The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal
-	 * state. All ExecutionJobVertices will be canceled and the postRunCleanup is executed.
-	 *
-	 * The SUSPENDED state is a local terminal state which stops the execution of the job but does
-	 * not remove the job from the HA job store so that it can be recovered by another JobManager.
-	 *
-	 * @param errorInfo ErrorInfo containing the cause of the suspension
-	 */
-	public void suspend(ErrorInfo errorInfo) {
-		Throwable suspensionCause = errorInfo != null
-			? errorInfo.getException()
-			: null;
-
 		while (true) {
 			JobStatus currentState = state;
 
@@ -1060,7 +1050,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				// stay in a terminal state
 				return;
 			} else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) {
-				this.failureCause = errorInfo;
+				initFailureCause(suspensionCause);
 
 				// make sure no concurrent local actions interfere with the cancellation
 				incrementGlobalModVersion();
@@ -1080,10 +1070,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	public void failGlobal(Throwable error) {
-		failGlobal(new ErrorInfo(error, System.currentTimeMillis()));
-	}
-
 	/**
 	 * Fails the execution graph globally. This failure will not be recovered by a specific
 	 * failover strategy, but results in a full restart of all tasks.
@@ -1093,13 +1079,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * exceptions that indicate a bug or an unexpected call race), and where a full restart is the
 	 * safe way to get consistency back.
 	 *
-	 * @param errorInfo ErrorInfo containing the exception that caused the failure.
+	 * @param t The exception that caused the failure.
 	 */
-	public void failGlobal(ErrorInfo errorInfo) {
-		Throwable t = errorInfo != null
-			? errorInfo.getException()
-			: null;
-
+	public void failGlobal(Throwable t) {
 		while (true) {
 			JobStatus current = state;
 			// stay in these states
@@ -1111,7 +1093,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			else if (current == JobStatus.RESTARTING) {
 				// we handle 'failGlobal()' while in 'RESTARTING' as a safety net in case something
 				// has gone wrong in 'RESTARTING' and we need to re-attempt the restarts
-				this.failureCause = errorInfo;
+				initFailureCause(t);
 
 				final long globalVersionForRestart = incrementGlobalModVersion();
 				if (tryRestartOrFail(globalVersionForRestart)) {
@@ -1119,7 +1101,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				}
 			}
 			else if (transitionState(current, JobStatus.FAILING, t)) {
-				this.failureCause = errorInfo;
+				initFailureCause(t);
 
 				// make sure no concurrent local or global actions interfere with the failover
 				final long globalVersionForRestart = incrementGlobalModVersion();
@@ -1322,6 +1304,11 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return GLOBAL_VERSION_UPDATER.incrementAndGet(this);
 	}
 
+	private void initFailureCause(Throwable t) {
+		this.failureCause = t;
+		this.failureInfo = new ErrorInfo(t, System.currentTimeMillis());
+	}
+
 	// ------------------------------------------------------------------------
 	//  Job Status Progress
 	// ------------------------------------------------------------------------
@@ -1417,9 +1404,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		JobStatus currentState = state;
 
 		if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
-			Throwable failureCause = this.failureCause != null
-				? this.failureCause.getException()
-				: null;
+			final Throwable failureCause = this.failureCause;
+
 			synchronized (progressLock) {
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(), getJobID(), failureCause);
@@ -1696,7 +1682,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				catch (Throwable t) {
 					// bug in the failover strategy - fall back to global failover
 					LOG.warn("Error in failover strategy - falling back to global restart", t);
-					failGlobal(new ErrorInfo(ex, timestamp));
+					failGlobal(ex);
 				}
 			}
 		}
@@ -1728,7 +1714,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			archivedVerticesInCreationOrder,
 			stateTimestamps,
 			getState(),
-			failureCause,
+			failureInfo,
 			getJsonPlan(),
 			getAccumulatorResultsStringified(),
 			serializedUserAccumulators,

http://git-wip-us.apache.org/repos/asf/flink/blob/524c5013/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 62f3e5e..70b9d35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -68,7 +68,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExcep
 
 	@Override
 	protected JobExceptionsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) {
-		ErrorInfo rootException = executionGraph.getFailureCause();
+		ErrorInfo rootException = executionGraph.getFailureInfo();
 		String rootExceptionMessage = null;
 		Long rootTimestamp = null;
 		if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/524c5013/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
index 6a4cc0d..a6bae86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
@@ -92,7 +92,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 		gen.writeStartObject();
 
 		// most important is the root failure cause
-		ErrorInfo rootException = graph.getFailureCause();
+		ErrorInfo rootException = graph.getFailureInfo();
 		if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
 			gen.writeStringField("root-exception", rootException.getExceptionAsString());
 			gen.writeNumberField("timestamp", rootException.getTimestamp());

http://git-wip-us.apache.org/repos/asf/flink/blob/524c5013/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index ba93b7b..5635763 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -163,7 +163,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
 		assertEquals(runtimeGraph.getJobID(), archivedGraph.getJobID());
 		assertEquals(runtimeGraph.getJobName(), archivedGraph.getJobName());
 		assertEquals(runtimeGraph.getState(), archivedGraph.getState());
-		assertEquals(runtimeGraph.getFailureCause().getExceptionAsString(), archivedGraph.getFailureCause().getExceptionAsString());
+		assertEquals(runtimeGraph.getFailureInfo().getExceptionAsString(), archivedGraph.getFailureInfo().getExceptionAsString());
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CREATED), archivedGraph.getStatusTimestamp(JobStatus.CREATED));
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RUNNING), archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
 		assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FAILING), archivedGraph.getStatusTimestamp(JobStatus.FAILING));

http://git-wip-us.apache.org/repos/asf/flink/blob/524c5013/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ErrorInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ErrorInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ErrorInfoTest.java
new file mode 100644
index 0000000..4841365
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ErrorInfoTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Simple test for the {@link ErrorInfo}.
+ */
+public class ErrorInfoTest {
+
+	@Test
+	public void testSerializationWithExceptionOutsideClassLoader() throws Exception {
+		final ErrorInfo error = new ErrorInfo(new ExceptionWithCustomClassLoader(), System.currentTimeMillis());
+		final ErrorInfo copy = CommonTestUtils.createCopySerializable(error);
+
+		assertEquals(error.getTimestamp(), copy.getTimestamp());
+		assertEquals(error.getExceptionAsString(), copy.getExceptionAsString());
+		assertEquals(error.getException().getMessage(), copy.getException().getMessage());
+
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class ExceptionWithCustomClassLoader extends Exception {
+
+		private static final long serialVersionUID = 42L;
+
+		private static final ClassLoader CUSTOM_LOADER = new URLClassLoader(new URL[0]);
+
+		@SuppressWarnings("unused")
+		private final Serializable outOfClassLoader = CommonTestUtils.createObjectForClassNotInClassPath(CUSTOM_LOADER);
+
+		public ExceptionWithCustomClassLoader() {
+			super("tada");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/524c5013/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index ccaed96..cc8ed7e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -816,7 +816,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		waitUntilJobStatus(eg, JobStatus.FAILED, 1000);
 
-		final Throwable t = eg.getFailureCause().getException();
+		final Throwable t = eg.getFailureCause();
 		if (!(t instanceof NoResourceAvailableException)) {
 			ExceptionUtils.rethrowException(t, t.getMessage());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/524c5013/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index b5a29c3..52d4c81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -252,7 +252,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 
 		assertEquals(JobStatus.SUSPENDED, eg.getState());
 
-		assertEquals(exception, eg.getFailureCause().getException());
+		assertEquals(exception, eg.getFailureCause());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/524c5013/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
index 641bf96..0e96f36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
@@ -76,8 +76,8 @@ public class JobExceptionsHandlerTest {
 	private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws IOException {
 		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
 
-		Assert.assertEquals(originalJob.getFailureCause().getExceptionAsString(), result.get("root-exception").asText());
-		Assert.assertEquals(originalJob.getFailureCause().getTimestamp(), result.get("timestamp").asLong());
+		Assert.assertEquals(originalJob.getFailureInfo().getExceptionAsString(), result.get("root-exception").asText());
+		Assert.assertEquals(originalJob.getFailureInfo().getTimestamp(), result.get("timestamp").asLong());
 
 		ArrayNode exceptions = (ArrayNode) result.get("all-exceptions");