You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/27 09:55:09 UTC

[flink] branch master updated: [FLINK-10400] Fail JobResult if application finished in CANCELED or FAILED state

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f88d42  [FLINK-10400] Fail JobResult if application finished in CANCELED or FAILED state
2f88d42 is described below

commit 2f88d428e43bdea5240aced63fb130fce6aa80c4
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sun Sep 23 21:09:19 2018 +0200

    [FLINK-10400] Fail JobResult if application finished in CANCELED or FAILED state
    
    In case of the CANCELED state, the client will throw an JobCancellationException.
    In case of the FAILED state, the client will throw an JobExecutionException.
    
    This closes #6742.
---
 .../flink/client/program/MiniClusterClient.java    |  5 +-
 .../client/program/rest/RestClusterClient.java     |  5 +-
 .../client/program/rest/RestClusterClientTest.java | 11 ++-
 .../apache/flink/runtime/jobmaster/JobResult.java  | 79 ++++++++++++++--------
 .../flink/runtime/minicluster/MiniCluster.java     |  2 -
 .../rest/messages/json/JobResultDeserializer.java  |  7 ++
 .../rest/messages/json/JobResultSerializer.java    |  5 ++
 .../flink/runtime/jobmaster/JobResultTest.java     | 65 ++++++++++++++++++
 .../job/JobExecutionResultResponseBodyTest.java    |  4 ++
 9 files changed, 148 insertions(+), 35 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 81cf784..3077f18 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -94,8 +95,8 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust
 
 			try {
 				return jobResult.toJobExecutionResult(classLoader);
-			} catch (JobResult.WrappedJobException e) {
-				throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e.getCause());
+			} catch (JobExecutionException e) {
+				throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
 			} catch (IOException | ClassNotFoundException e) {
 				throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
 			}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 935a07f..86cc52d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -32,6 +32,7 @@ import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -263,8 +264,8 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 			try {
 				this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader);
 				return lastJobExecutionResult;
-			} catch (JobResult.WrappedJobException we) {
-				throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), we.getCause());
+			} catch (JobExecutionException e) {
+				throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
 			} catch (IOException | ClassNotFoundException e) {
 				throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
 			}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 75f16c0..abe59d3 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -122,6 +123,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -229,6 +231,7 @@ public class RestClusterClientTest extends TestLogger {
 		TestJobExecutionResultHandler testJobExecutionResultHandler =
 			new TestJobExecutionResultHandler(
 				JobExecutionResultResponseBody.created(new JobResult.Builder()
+					.applicationStatus(ApplicationStatus.SUCCEEDED)
 					.jobId(jobId)
 					.netRuntime(Long.MAX_VALUE)
 					.build()));
@@ -351,11 +354,13 @@ public class RestClusterClientTest extends TestLogger {
 				new RestHandlerException("should trigger retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
 				JobExecutionResultResponseBody.inProgress(),
 				JobExecutionResultResponseBody.created(new JobResult.Builder()
+					.applicationStatus(ApplicationStatus.SUCCEEDED)
 					.jobId(jobId)
 					.netRuntime(Long.MAX_VALUE)
 					.accumulatorResults(Collections.singletonMap("testName", new SerializedValue<>(OptionalFailure.of(1.0))))
 					.build()),
 				JobExecutionResultResponseBody.created(new JobResult.Builder()
+					.applicationStatus(ApplicationStatus.FAILED)
 					.jobId(jobId)
 					.netRuntime(Long.MAX_VALUE)
 					.serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
@@ -385,8 +390,10 @@ public class RestClusterClientTest extends TestLogger {
 				restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
 				fail("Expected exception not thrown.");
 			} catch (final ProgramInvocationException e) {
-				assertThat(e.getCause(), instanceOf(RuntimeException.class));
-				assertThat(e.getCause().getMessage(), equalTo("expected"));
+				final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);
+
+				assertThat(cause.isPresent(), is(true));
+				assertThat(cause.get().getMessage(), equalTo("expected"));
 			}
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
index 60ddbe3..eb7c473 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
@@ -22,11 +22,13 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
@@ -54,6 +56,8 @@ public class JobResult implements Serializable {
 
 	private final JobID jobId;
 
+	private final ApplicationStatus applicationStatus;
+
 	private final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults;
 
 	private final long netRuntime;
@@ -64,6 +68,7 @@ public class JobResult implements Serializable {
 
 	private JobResult(
 			final JobID jobId,
+			final ApplicationStatus applicationStatus,
 			final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults,
 			final long netRuntime,
 			@Nullable final SerializedThrowable serializedThrowable) {
@@ -71,6 +76,7 @@ public class JobResult implements Serializable {
 		checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0");
 
 		this.jobId = requireNonNull(jobId);
+		this.applicationStatus = requireNonNull(applicationStatus);
 		this.accumulatorResults = requireNonNull(accumulatorResults);
 		this.netRuntime = netRuntime;
 		this.serializedThrowable = serializedThrowable;
@@ -80,13 +86,17 @@ public class JobResult implements Serializable {
 	 * Returns {@code true} if the job finished successfully.
 	 */
 	public boolean isSuccess() {
-		return serializedThrowable == null;
+		return applicationStatus == ApplicationStatus.SUCCEEDED || (applicationStatus == ApplicationStatus.UNKNOWN && serializedThrowable == null);
 	}
 
 	public JobID getJobId() {
 		return jobId;
 	}
 
+	public ApplicationStatus getApplicationStatus() {
+		return applicationStatus;
+	}
+
 	public Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorResults() {
 		return accumulatorResults;
 	}
@@ -108,22 +118,40 @@ public class JobResult implements Serializable {
 	 *
 	 * @param classLoader to use for deserialization
 	 * @return JobExecutionResult
-	 * @throws WrappedJobException if the JobResult contains a serialized exception
+	 * @throws JobCancellationException if the job was cancelled
+	 * @throws JobExecutionException if the job execution did not succeed
 	 * @throws IOException if the accumulator could not be deserialized
 	 * @throws ClassNotFoundException if the accumulator could not deserialized
 	 */
-	public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws WrappedJobException, IOException, ClassNotFoundException {
-		if (serializedThrowable != null) {
-			final Throwable throwable = serializedThrowable.deserializeError(classLoader);
-			throw new WrappedJobException(throwable);
-		}
+	public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws JobExecutionException, IOException, ClassNotFoundException {
+		if (applicationStatus == ApplicationStatus.SUCCEEDED) {
+			return new JobExecutionResult(
+				jobId,
+				netRuntime,
+				AccumulatorHelper.deserializeAccumulators(
+					accumulatorResults,
+					classLoader));
+		} else {
+			final Throwable cause;
+
+			if (serializedThrowable == null) {
+				cause = null;
+			} else {
+				cause = serializedThrowable.deserializeError(classLoader);
+			}
+
+			final JobExecutionException exception;
+
+			if (applicationStatus == ApplicationStatus.FAILED) {
+				exception = new JobExecutionException(jobId, "Job execution failed.", cause);
+			} else if (applicationStatus == ApplicationStatus.CANCELED) {
+				exception = new JobCancellationException(jobId, "Job was cancelled.", cause);
+			} else {
+				exception = new JobExecutionException(jobId, "Job completed with illegal application status: " + applicationStatus + '.', cause);
+			}
 
-		return new JobExecutionResult(
-			jobId,
-			netRuntime,
-			AccumulatorHelper.deserializeAccumulators(
-				accumulatorResults,
-				classLoader));
+			throw exception;
+		}
 	}
 
 	/**
@@ -134,6 +162,8 @@ public class JobResult implements Serializable {
 
 		private JobID jobId;
 
+		private ApplicationStatus applicationStatus = ApplicationStatus.UNKNOWN;
+
 		private Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults;
 
 		private long netRuntime = -1;
@@ -145,6 +175,11 @@ public class JobResult implements Serializable {
 			return this;
 		}
 
+		public Builder applicationStatus(final ApplicationStatus applicationStatus) {
+			this.applicationStatus = applicationStatus;
+			return this;
+		}
+
 		public Builder accumulatorResults(final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults) {
 			this.accumulatorResults = accumulatorResults;
 			return this;
@@ -163,6 +198,7 @@ public class JobResult implements Serializable {
 		public JobResult build() {
 			return new JobResult(
 				jobId,
+				applicationStatus,
 				accumulatorResults == null ? Collections.emptyMap() : accumulatorResults,
 				netRuntime,
 				serializedThrowable);
@@ -188,6 +224,8 @@ public class JobResult implements Serializable {
 		final JobResult.Builder builder = new JobResult.Builder();
 		builder.jobId(jobId);
 
+		builder.applicationStatus(ApplicationStatus.fromJobStatus(accessExecutionGraph.getState()));
+
 		final long netRuntime = accessExecutionGraph.getStatusTimestamp(jobStatus) - accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED);
 		// guard against clock changes
 		final long guardedNetRuntime = Math.max(netRuntime, 0L);
@@ -204,17 +242,4 @@ public class JobResult implements Serializable {
 
 		return builder.build();
 	}
-
-	/**
-	 * Exception which indicates that the job has finished with an {@link Exception}.
-	 */
-	public static final class WrappedJobException extends FlinkException {
-
-		private static final long serialVersionUID = 6535061898650156019L;
-
-		public WrappedJobException(Throwable cause) {
-			super(cause);
-		}
-	}
-
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 8054a38..bbdb099 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -623,8 +623,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 
 		try {
 			return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
-		} catch (JobResult.WrappedJobException e) {
-			throw new JobExecutionException(job.getJobID(), e.getCause());
 		} catch (IOException | ClassNotFoundException e) {
 			throw new JobExecutionException(job.getJobID(), e);
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
index e568f47..8342eb3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages.json;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedThrowable;
@@ -68,6 +69,7 @@ public class JobResultDeserializer extends StdDeserializer<JobResult> {
 	@Override
 	public JobResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException {
 		JobID jobId = null;
+		ApplicationStatus applicationStatus = ApplicationStatus.UNKNOWN;
 		long netRuntime = -1;
 		SerializedThrowable serializedThrowable = null;
 		Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = null;
@@ -85,6 +87,10 @@ public class JobResultDeserializer extends StdDeserializer<JobResult> {
 					assertNextToken(p, JsonToken.VALUE_STRING);
 					jobId = jobIdDeserializer.deserialize(p, ctxt);
 					break;
+				case JobResultSerializer.FIELD_NAME_APPLICATION_STATUS:
+					assertNextToken(p, JsonToken.VALUE_STRING);
+					applicationStatus = ApplicationStatus.valueOf(p.getValueAsString().toUpperCase());
+					break;
 				case JobResultSerializer.FIELD_NAME_NET_RUNTIME:
 					assertNextToken(p, JsonToken.VALUE_NUMBER_INT);
 					netRuntime = p.getLongValue();
@@ -105,6 +111,7 @@ public class JobResultDeserializer extends StdDeserializer<JobResult> {
 		try {
 			return new JobResult.Builder()
 				.jobId(jobId)
+				.applicationStatus(applicationStatus)
 				.netRuntime(netRuntime)
 				.accumulatorResults(accumulatorResults)
 				.serializedThrowable(serializedThrowable)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
index 694fa2f..cdf3541 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
@@ -44,6 +44,8 @@ public class JobResultSerializer extends StdSerializer<JobResult> {
 
 	static final String FIELD_NAME_JOB_ID = "id";
 
+	static final String FIELD_NAME_APPLICATION_STATUS = "application-status";
+
 	static final String FIELD_NAME_NET_RUNTIME = "net-runtime";
 
 	static final String FIELD_NAME_ACCUMULATOR_RESULTS = "accumulator-results";
@@ -76,6 +78,9 @@ public class JobResultSerializer extends StdSerializer<JobResult> {
 		gen.writeFieldName(FIELD_NAME_JOB_ID);
 		jobIdSerializer.serialize(result.getJobId(), gen, provider);
 
+		gen.writeFieldName(FIELD_NAME_APPLICATION_STATUS);
+		gen.writeString(result.getApplicationStatus().name());
+
 		gen.writeFieldName(FIELD_NAME_ACCUMULATOR_RESULTS);
 		gen.writeStartObject();
 		final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = result.getAccumulatorResults();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
index 84c9da5..6543fa2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
@@ -19,12 +19,19 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -66,4 +73,62 @@ public class JobResultTest extends TestLogger {
 		assertThat(jobResult.isSuccess(), equalTo(true));
 	}
 
+	@Test
+	public void testCancelledJobIsFailureResult() {
+		final JobResult jobResult = JobResult.createFrom(
+			new ArchivedExecutionGraphBuilder()
+				.setJobID(new JobID())
+				.setState(JobStatus.CANCELED)
+				.build());
+
+		assertThat(jobResult.isSuccess(), is(false));
+	}
+
+	@Test
+	public void testFailedJobIsFailureResult() {
+		final JobResult jobResult = JobResult.createFrom(
+			new ArchivedExecutionGraphBuilder()
+				.setJobID(new JobID())
+				.setState(JobStatus.FAILED)
+				.setFailureCause(new ErrorInfo(new FlinkException("Test exception"), 42L))
+				.build());
+
+		assertThat(jobResult.isSuccess(), is(false));
+	}
+
+	@Test
+	public void testCancelledJobThrowsJobCancellationException() throws Exception {
+		final FlinkException cause = new FlinkException("Test exception");
+		final JobResult jobResult = JobResult.createFrom(
+			new ArchivedExecutionGraphBuilder()
+				.setJobID(new JobID())
+				.setState(JobStatus.CANCELED)
+				.setFailureCause(new ErrorInfo(cause, 42L))
+				.build());
+
+		try {
+			jobResult.toJobExecutionResult(getClass().getClassLoader());
+			fail("Job should fail with an JobCancellationException.");
+		} catch (JobCancellationException expected) {
+			assertThat(expected.getCause(), is(equalTo(cause)));
+		}
+	}
+
+	@Test
+	public void testFailedJobThrowsJobExecutionException() throws Exception {
+		final FlinkException cause = new FlinkException("Test exception");
+		final JobResult jobResult = JobResult.createFrom(
+			new ArchivedExecutionGraphBuilder()
+				.setJobID(new JobID())
+				.setState(JobStatus.FAILED)
+				.setFailureCause(new ErrorInfo(cause, 42L))
+				.build());
+
+		try {
+			jobResult.toJobExecutionResult(getClass().getClassLoader());
+			fail("Job should fail with JobExecutionException.");
+		} catch (JobExecutionException expected) {
+			assertThat(expected.getCause(), is(equalTo(cause)));
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
index 9534d2b..c8cc7f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages.job;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
 import org.apache.flink.util.OptionalFailure;
@@ -64,12 +65,14 @@ public class JobExecutionResultResponseBodyTest
 		return Arrays.asList(new Object[][] {
 			{JobExecutionResultResponseBody.created(new JobResult.Builder()
 				.jobId(TEST_JOB_ID)
+				.applicationStatus(ApplicationStatus.SUCCEEDED)
 				.netRuntime(TEST_NET_RUNTIME)
 				.accumulatorResults(TEST_ACCUMULATORS)
 				.serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
 				.build())},
 			{JobExecutionResultResponseBody.created(new JobResult.Builder()
 				.jobId(TEST_JOB_ID)
+				.applicationStatus(ApplicationStatus.FAILED)
 				.netRuntime(TEST_NET_RUNTIME)
 				.accumulatorResults(TEST_ACCUMULATORS)
 				.build())},
@@ -108,6 +111,7 @@ public class JobExecutionResultResponseBodyTest
 			assertNotNull(actualJobExecutionResult);
 
 			assertThat(actualJobExecutionResult.getJobId(), equalTo(expectedJobExecutionResult.getJobId()));
+			assertThat(actualJobExecutionResult.getApplicationStatus(), equalTo(expectedJobExecutionResult.getApplicationStatus()));
 			assertThat(actualJobExecutionResult.getNetRuntime(), equalTo(expectedJobExecutionResult.getNetRuntime()));
 			assertThat(actualJobExecutionResult.getAccumulatorResults(), equalTo(expectedJobExecutionResult.getAccumulatorResults()));