You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/27 09:56:59 UTC

[GitHub] asfgit closed pull request #6742: [FLINK-10400] Fail JobResult if application finished in CANCELED or FAILED state

asfgit closed pull request #6742: [FLINK-10400] Fail JobResult if application finished in CANCELED or FAILED state
URL: https://github.com/apache/flink/pull/6742
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 81cf784441d..3077f183acb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -94,8 +95,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
 
 			try {
 				return jobResult.toJobExecutionResult(classLoader);
-			} catch (JobResult.WrappedJobException e) {
-				throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e.getCause());
+			} catch (JobExecutionException e) {
+				throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
 			} catch (IOException | ClassNotFoundException e) {
 				throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
 			}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 935a07faf89..86cc52da3b2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -32,6 +32,7 @@
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -263,8 +264,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
 			try {
 				this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader);
 				return lastJobExecutionResult;
-			} catch (JobResult.WrappedJobException we) {
-				throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), we.getCause());
+			} catch (JobExecutionException e) {
+				throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
 			} catch (IOException | ClassNotFoundException e) {
 				throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e);
 			}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 75f16c03330..abe59d38bb6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -31,6 +31,7 @@
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -122,6 +123,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -229,6 +231,7 @@ public void testJobSubmitCancelStop() throws Exception {
 		TestJobExecutionResultHandler testJobExecutionResultHandler =
 			new TestJobExecutionResultHandler(
 				JobExecutionResultResponseBody.created(new JobResult.Builder()
+					.applicationStatus(ApplicationStatus.SUCCEEDED)
 					.jobId(jobId)
 					.netRuntime(Long.MAX_VALUE)
 					.build()));
@@ -351,11 +354,13 @@ public void testSubmitJobAndWaitForExecutionResult() throws Exception {
 				new RestHandlerException("should trigger retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
 				JobExecutionResultResponseBody.inProgress(),
 				JobExecutionResultResponseBody.created(new JobResult.Builder()
+					.applicationStatus(ApplicationStatus.SUCCEEDED)
 					.jobId(jobId)
 					.netRuntime(Long.MAX_VALUE)
 					.accumulatorResults(Collections.singletonMap("testName", new SerializedValue<>(OptionalFailure.of(1.0))))
 					.build()),
 				JobExecutionResultResponseBody.created(new JobResult.Builder()
+					.applicationStatus(ApplicationStatus.FAILED)
 					.jobId(jobId)
 					.netRuntime(Long.MAX_VALUE)
 					.serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
@@ -385,8 +390,10 @@ public void testSubmitJobAndWaitForExecutionResult() throws Exception {
 				restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
 				fail("Expected exception not thrown.");
 			} catch (final ProgramInvocationException e) {
-				assertThat(e.getCause(), instanceOf(RuntimeException.class));
-				assertThat(e.getCause().getMessage(), equalTo("expected"));
+				final Optional<RuntimeException> cause = ExceptionUtils.findThrowable(e, RuntimeException.class);
+
+				assertThat(cause.isPresent(), is(true));
+				assertThat(cause.get().getMessage(), equalTo("expected"));
 			}
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
index 60ddbe3e475..eb7c4734f1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
@@ -22,11 +22,13 @@
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
@@ -54,6 +56,8 @@
 
 	private final JobID jobId;
 
+	private final ApplicationStatus applicationStatus;
+
 	private final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults;
 
 	private final long netRuntime;
@@ -64,6 +68,7 @@
 
 	private JobResult(
 			final JobID jobId,
+			final ApplicationStatus applicationStatus,
 			final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults,
 			final long netRuntime,
 			@Nullable final SerializedThrowable serializedThrowable) {
@@ -71,6 +76,7 @@ private JobResult(
 		checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0");
 
 		this.jobId = requireNonNull(jobId);
+		this.applicationStatus = requireNonNull(applicationStatus);
 		this.accumulatorResults = requireNonNull(accumulatorResults);
 		this.netRuntime = netRuntime;
 		this.serializedThrowable = serializedThrowable;
@@ -80,13 +86,17 @@ private JobResult(
 	 * Returns {@code true} if the job finished successfully.
 	 */
 	public boolean isSuccess() {
-		return serializedThrowable == null;
+		return applicationStatus == ApplicationStatus.SUCCEEDED || (applicationStatus == ApplicationStatus.UNKNOWN && serializedThrowable == null);
 	}
 
 	public JobID getJobId() {
 		return jobId;
 	}
 
+	public ApplicationStatus getApplicationStatus() {
+		return applicationStatus;
+	}
+
 	public Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorResults() {
 		return accumulatorResults;
 	}
@@ -108,22 +118,40 @@ public long getNetRuntime() {
 	 *
 	 * @param classLoader to use for deserialization
 	 * @return JobExecutionResult
-	 * @throws WrappedJobException if the JobResult contains a serialized exception
+	 * @throws JobCancellationException if the job was cancelled
+	 * @throws JobExecutionException if the job execution did not succeed
 	 * @throws IOException if the accumulator could not be deserialized
 	 * @throws ClassNotFoundException if the accumulator could not deserialized
 	 */
-	public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws WrappedJobException, IOException, ClassNotFoundException {
-		if (serializedThrowable != null) {
-			final Throwable throwable = serializedThrowable.deserializeError(classLoader);
-			throw new WrappedJobException(throwable);
-		}
+	public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws JobExecutionException, IOException, ClassNotFoundException {
+		if (applicationStatus == ApplicationStatus.SUCCEEDED) {
+			return new JobExecutionResult(
+				jobId,
+				netRuntime,
+				AccumulatorHelper.deserializeAccumulators(
+					accumulatorResults,
+					classLoader));
+		} else {
+			final Throwable cause;
+
+			if (serializedThrowable == null) {
+				cause = null;
+			} else {
+				cause = serializedThrowable.deserializeError(classLoader);
+			}
+
+			final JobExecutionException exception;
+
+			if (applicationStatus == ApplicationStatus.FAILED) {
+				exception = new JobExecutionException(jobId, "Job execution failed.", cause);
+			} else if (applicationStatus == ApplicationStatus.CANCELED) {
+				exception = new JobCancellationException(jobId, "Job was cancelled.", cause);
+			} else {
+				exception = new JobExecutionException(jobId, "Job completed with illegal application status: " + applicationStatus + '.', cause);
+			}
 
-		return new JobExecutionResult(
-			jobId,
-			netRuntime,
-			AccumulatorHelper.deserializeAccumulators(
-				accumulatorResults,
-				classLoader));
+			throw exception;
+		}
 	}
 
 	/**
@@ -134,6 +162,8 @@ public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws W
 
 		private JobID jobId;
 
+		private ApplicationStatus applicationStatus = ApplicationStatus.UNKNOWN;
+
 		private Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults;
 
 		private long netRuntime = -1;
@@ -145,6 +175,11 @@ public Builder jobId(final JobID jobId) {
 			return this;
 		}
 
+		public Builder applicationStatus(final ApplicationStatus applicationStatus) {
+			this.applicationStatus = applicationStatus;
+			return this;
+		}
+
 		public Builder accumulatorResults(final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults) {
 			this.accumulatorResults = accumulatorResults;
 			return this;
@@ -163,6 +198,7 @@ public Builder serializedThrowable(final SerializedThrowable serializedThrowable
 		public JobResult build() {
 			return new JobResult(
 				jobId,
+				applicationStatus,
 				accumulatorResults == null ? Collections.emptyMap() : accumulatorResults,
 				netRuntime,
 				serializedThrowable);
@@ -188,6 +224,8 @@ public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {
 		final JobResult.Builder builder = new JobResult.Builder();
 		builder.jobId(jobId);
 
+		builder.applicationStatus(ApplicationStatus.fromJobStatus(accessExecutionGraph.getState()));
+
 		final long netRuntime = accessExecutionGraph.getStatusTimestamp(jobStatus) - accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED);
 		// guard against clock changes
 		final long guardedNetRuntime = Math.max(netRuntime, 0L);
@@ -204,17 +242,4 @@ public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {
 
 		return builder.build();
 	}
-
-	/**
-	 * Exception which indicates that the job has finished with an {@link Exception}.
-	 */
-	public static final class WrappedJobException extends FlinkException {
-
-		private static final long serialVersionUID = 6535061898650156019L;
-
-		public WrappedJobException(Throwable cause) {
-			super(cause);
-		}
-	}
-
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 8054a383739..bbdb099ae0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -623,8 +623,6 @@ public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionEx
 
 		try {
 			return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
-		} catch (JobResult.WrappedJobException e) {
-			throw new JobExecutionException(job.getJobID(), e.getCause());
 		} catch (IOException | ClassNotFoundException e) {
 			throw new JobExecutionException(job.getJobID(), e);
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
index e568f476c7e..8342eb374a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages.json;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedThrowable;
@@ -68,6 +69,7 @@ public JobResultDeserializer() {
 	@Override
 	public JobResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException {
 		JobID jobId = null;
+		ApplicationStatus applicationStatus = ApplicationStatus.UNKNOWN;
 		long netRuntime = -1;
 		SerializedThrowable serializedThrowable = null;
 		Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = null;
@@ -85,6 +87,10 @@ public JobResult deserialize(final JsonParser p, final DeserializationContext ct
 					assertNextToken(p, JsonToken.VALUE_STRING);
 					jobId = jobIdDeserializer.deserialize(p, ctxt);
 					break;
+				case JobResultSerializer.FIELD_NAME_APPLICATION_STATUS:
+					assertNextToken(p, JsonToken.VALUE_STRING);
+					applicationStatus = ApplicationStatus.valueOf(p.getValueAsString().toUpperCase());
+					break;
 				case JobResultSerializer.FIELD_NAME_NET_RUNTIME:
 					assertNextToken(p, JsonToken.VALUE_NUMBER_INT);
 					netRuntime = p.getLongValue();
@@ -105,6 +111,7 @@ public JobResult deserialize(final JsonParser p, final DeserializationContext ct
 		try {
 			return new JobResult.Builder()
 				.jobId(jobId)
+				.applicationStatus(applicationStatus)
 				.netRuntime(netRuntime)
 				.accumulatorResults(accumulatorResults)
 				.serializedThrowable(serializedThrowable)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
index 694fa2f529b..cdf3541fe3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
@@ -44,6 +44,8 @@
 
 	static final String FIELD_NAME_JOB_ID = "id";
 
+	static final String FIELD_NAME_APPLICATION_STATUS = "application-status";
+
 	static final String FIELD_NAME_NET_RUNTIME = "net-runtime";
 
 	static final String FIELD_NAME_ACCUMULATOR_RESULTS = "accumulator-results";
@@ -76,6 +78,9 @@ public void serialize(
 		gen.writeFieldName(FIELD_NAME_JOB_ID);
 		jobIdSerializer.serialize(result.getJobId(), gen, provider);
 
+		gen.writeFieldName(FIELD_NAME_APPLICATION_STATUS);
+		gen.writeString(result.getApplicationStatus().name());
+
 		gen.writeFieldName(FIELD_NAME_ACCUMULATOR_RESULTS);
 		gen.writeStartObject();
 		final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = result.getAccumulatorResults();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
index 84c9da5e7de..6543fa24b1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
@@ -19,12 +19,19 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -66,4 +73,62 @@ public void testIsSuccess() throws Exception {
 		assertThat(jobResult.isSuccess(), equalTo(true));
 	}
 
+	@Test
+	public void testCancelledJobIsFailureResult() {
+		final JobResult jobResult = JobResult.createFrom(
+			new ArchivedExecutionGraphBuilder()
+				.setJobID(new JobID())
+				.setState(JobStatus.CANCELED)
+				.build());
+
+		assertThat(jobResult.isSuccess(), is(false));
+	}
+
+	@Test
+	public void testFailedJobIsFailureResult() {
+		final JobResult jobResult = JobResult.createFrom(
+			new ArchivedExecutionGraphBuilder()
+				.setJobID(new JobID())
+				.setState(JobStatus.FAILED)
+				.setFailureCause(new ErrorInfo(new FlinkException("Test exception"), 42L))
+				.build());
+
+		assertThat(jobResult.isSuccess(), is(false));
+	}
+
+	@Test
+	public void testCancelledJobThrowsJobCancellationException() throws Exception {
+		final FlinkException cause = new FlinkException("Test exception");
+		final JobResult jobResult = JobResult.createFrom(
+			new ArchivedExecutionGraphBuilder()
+				.setJobID(new JobID())
+				.setState(JobStatus.CANCELED)
+				.setFailureCause(new ErrorInfo(cause, 42L))
+				.build());
+
+		try {
+			jobResult.toJobExecutionResult(getClass().getClassLoader());
+			fail("Job should fail with an JobCancellationException.");
+		} catch (JobCancellationException expected) {
+			assertThat(expected.getCause(), is(equalTo(cause)));
+		}
+	}
+
+	@Test
+	public void testFailedJobThrowsJobExecutionException() throws Exception {
+		final FlinkException cause = new FlinkException("Test exception");
+		final JobResult jobResult = JobResult.createFrom(
+			new ArchivedExecutionGraphBuilder()
+				.setJobID(new JobID())
+				.setState(JobStatus.FAILED)
+				.setFailureCause(new ErrorInfo(cause, 42L))
+				.build());
+
+		try {
+			jobResult.toJobExecutionResult(getClass().getClassLoader());
+			fail("Job should fail with JobExecutionException.");
+		} catch (JobExecutionException expected) {
+			assertThat(expected.getCause(), is(equalTo(cause)));
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
index 9534d2bca7c..c8cc7f3e7c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages.job;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
 import org.apache.flink.util.OptionalFailure;
@@ -64,12 +65,14 @@
 		return Arrays.asList(new Object[][] {
 			{JobExecutionResultResponseBody.created(new JobResult.Builder()
 				.jobId(TEST_JOB_ID)
+				.applicationStatus(ApplicationStatus.SUCCEEDED)
 				.netRuntime(TEST_NET_RUNTIME)
 				.accumulatorResults(TEST_ACCUMULATORS)
 				.serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
 				.build())},
 			{JobExecutionResultResponseBody.created(new JobResult.Builder()
 				.jobId(TEST_JOB_ID)
+				.applicationStatus(ApplicationStatus.FAILED)
 				.netRuntime(TEST_NET_RUNTIME)
 				.accumulatorResults(TEST_ACCUMULATORS)
 				.build())},
@@ -108,6 +111,7 @@ protected void assertOriginalEqualsToUnmarshalled(
 			assertNotNull(actualJobExecutionResult);
 
 			assertThat(actualJobExecutionResult.getJobId(), equalTo(expectedJobExecutionResult.getJobId()));
+			assertThat(actualJobExecutionResult.getApplicationStatus(), equalTo(expectedJobExecutionResult.getApplicationStatus()));
 			assertThat(actualJobExecutionResult.getNetRuntime(), equalTo(expectedJobExecutionResult.getNetRuntime()));
 			assertThat(actualJobExecutionResult.getAccumulatorResults(), equalTo(expectedJobExecutionResult.getAccumulatorResults()));
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services