You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/31 14:13:58 UTC
[5/5] flink git commit: [FLINK-2543] [core] Make exception
communication and result/failure notifications consistent with respect to
serialization of exceptions
[FLINK-2543] [core] Make exception communication and result/failure notifications consistent with respect to serialization of exceptions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/13a95961
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/13a95961
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/13a95961
Branch: refs/heads/release-0.10.0-milestone-1
Commit: 13a95961f495efe890b8999deb47076f217cb418
Parents: e315a66
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 28 21:05:20 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 31 13:01:05 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/program/Client.java | 4 +-
.../apache/flink/client/program/ClientTest.java | 18 +-
.../flink/runtime/akka/FlinkUntypedActor.java | 3 +-
.../client/JobCancellationException.java | 2 +-
.../apache/flink/runtime/client/JobClient.java | 156 +++++++++++-------
.../flink/runtime/client/JobClientActor.java | 30 ++--
.../runtime/client/JobExecutionException.java | 11 +-
.../flink/runtime/executiongraph/Execution.java | 5 +-
.../runtime/executiongraph/ExecutionGraph.java | 23 +--
.../runtime/taskmanager/TaskExecutionState.java | 23 ++-
.../flink/runtime/util/SerializedThrowable.java | 163 +++++++++++++------
.../flink/runtime/jobmanager/JobManager.scala | 68 ++++----
.../messages/ExecutionGraphMessages.scala | 20 ++-
.../runtime/messages/JobManagerMessages.scala | 14 ++
.../runtime/minicluster/FlinkMiniCluster.scala | 16 +-
.../runtime/jobmanager/JobManagerTest.java | 8 +-
.../runtime/taskmanager/TaskCancelTest.java | 3 +-
.../taskmanager/TaskExecutionStateTest.java | 3 +-
.../runtime/util/SerializedThrowableTest.java | 140 ++++++++++++++++
.../jobmanager/CoLocationConstraintITCase.scala | 6 +-
.../runtime/jobmanager/JobManagerITCase.scala | 97 ++++++-----
.../runtime/jobmanager/RecoveryITCase.scala | 15 +-
.../runtime/jobmanager/SlotSharingITCase.scala | 10 +-
.../TaskManagerFailsWithSlotSharingITCase.scala | 19 +--
.../flink/streaming/util/ClusterUtil.java | 5 +-
.../accumulators/AccumulatorLiveITCase.java | 7 +-
.../test/classloading/ClassLoaderITCase.java | 48 +++---
.../jobmanager/JobManagerFailsITCase.scala | 14 +-
.../taskmanager/TaskManagerFailsITCase.scala | 30 ++--
29 files changed, 622 insertions(+), 339 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 2f5d888..e90a39c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -425,10 +425,10 @@ public class Client {
try{
if (wait) {
return JobClient.submitJobAndWait(actorSystem,
- jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, this.userCodeClassLoader);
+ jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, userCodeClassLoader);
}
else {
- JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout);
+ JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, userCodeClassLoader);
// return a dummy execution result with the JobId
return new JobSubmissionResult(jobGraph.getJobID());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 594525e..46de93d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.program;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
+
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
@@ -39,15 +40,18 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.net.NetUtils;
-import org.junit.After;
+import org.apache.flink.runtime.util.SerializedThrowable;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+
import scala.Option;
import scala.Some;
import scala.Tuple2;
@@ -225,16 +229,17 @@ public class ClientTest {
public static class SuccessReturningActor extends FlinkUntypedActor {
- private Option<UUID> leaderSessionID = Option.<UUID>apply(UUID.randomUUID());
+ private final Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
@Override
public void handleMessage(Object message) {
if (message instanceof JobManagerMessages.SubmitJob) {
JobID jid = ((JobManagerMessages.SubmitJob) message).jobGraph().getJobID();
getSender().tell(
- decorateMessage(new Status.Success(jid)),
+ decorateMessage(new JobManagerMessages.JobSubmitSuccess(jid)),
getSelf());
- } else if(message instanceof JobManagerMessages.RequestLeaderSessionID$) {
+ }
+ else if (message.getClass() == JobManagerMessages.getRequestLeaderSessionID().getClass()) {
getSender().tell(
decorateMessage(new JobManagerMessages.ResponseLeaderSessionID(leaderSessionID)),
getSelf());
@@ -254,12 +259,13 @@ public class ClientTest {
public static class FailureReturningActor extends FlinkUntypedActor {
- private Option<UUID> leaderSessionID = Option.<UUID>apply(UUID.randomUUID());
+ private Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
@Override
public void handleMessage(Object message) {
getSender().tell(
- decorateMessage(new Status.Failure(new Exception("test"))),
+ decorateMessage(new JobManagerMessages.JobResultFailure(
+ new SerializedThrowable(new Exception("test")))),
getSelf());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index 1456758..0623862 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -39,7 +39,8 @@ import java.util.UUID;
* a leader session ID option which is returned by getLeaderSessionID.
*/
public abstract class FlinkUntypedActor extends UntypedActor {
- protected Logger LOG = LoggerFactory.getLogger(getClass());
+
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
/**
* This method is called by Akka if a new message has arrived for the actor. It logs the
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
index 842870d..559d805 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
@@ -28,7 +28,7 @@ public class JobCancellationException extends JobExecutionException {
private static final long serialVersionUID = 2818087325120827526L;
- public JobCancellationException(final JobID jobID, final String msg, final Throwable cause){
+ public JobCancellationException(JobID jobID, String msg, Throwable cause){
super(jobID, msg, cause);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 44d2c00..7507643 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -25,7 +25,7 @@ import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
-import com.google.common.base.Preconditions;
+
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
@@ -35,8 +35,8 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
-
import org.apache.flink.runtime.util.SerializedThrowable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +53,8 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeoutException;
+import static com.google.common.base.Preconditions.checkNotNull;
+
/**
* The JobClient bridges between the JobManager's asynchronous actor messages and
* the synchronous method calls to trigger.
@@ -87,7 +89,6 @@ public class JobClient {
* @return The socket address of the JobManager actor system
*/
public static InetSocketAddress getJobManagerAddress(Configuration config) throws IOException {
-
String jobManagerAddress = config.getString(
ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
@@ -131,10 +132,10 @@ public class JobClient {
boolean sysoutLogUpdates,
ClassLoader userCodeClassloader) throws JobExecutionException {
- Preconditions.checkNotNull(actorSystem, "The actorSystem must not be null.");
- Preconditions.checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
- Preconditions.checkNotNull(jobGraph, "The jobGraph must not be null.");
- Preconditions.checkNotNull(timeout, "The timeout must not be null.");
+ checkNotNull(actorSystem, "The actorSystem must not be null.");
+ checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
+ checkNotNull(jobGraph, "The jobGraph must not be null.");
+ checkNotNull(timeout, "The timeout must not be null.");
// for this job, we create a proxy JobClientActor that deals with all communication with
// the JobManager. It forwards the job submission, checks the success/failure responses, logs
@@ -148,35 +149,15 @@ public class JobClient {
jobManagerGateway.leaderSessionID());
ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
-
+
+ // first block handles errors while waiting for the result
+ Object answer;
try {
Future<Object> future = Patterns.ask(jobClientActor,
new JobClientMessages.SubmitJobAndWait(jobGraph),
new Timeout(AkkaUtils.INF_TIMEOUT()));
-
- Object answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
-
- if (answer instanceof JobManagerMessages.JobResultSuccess) {
- LOG.info("Job execution complete");
-
- SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
- if (result != null) {
- return result.toJobExecutionResult(userCodeClassloader);
- } else {
- throw new Exception("Job was successfully executed but result contained a null JobExecutionResult.");
- }
- } else {
- throw new Exception("Unknown answer after submitting the job: " + answer);
- }
- }
- catch (JobExecutionException e) {
- if(e.getCause() instanceof SerializedThrowable) {
- SerializedThrowable serializedThrowable = (SerializedThrowable)e.getCause();
- Throwable deserialized = serializedThrowable.deserializeError(userCodeClassloader);
- throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed " + deserialized.getMessage(), deserialized);
- } else {
- throw e;
- }
+
+ answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
}
catch (TimeoutException e) {
throw new JobTimeoutException(jobGraph.getJobID(), "Timeout while waiting for JobManager answer. " +
@@ -190,6 +171,48 @@ public class JobClient {
// failsafe shutdown of the client actor
jobClientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
+
+ // second block handles the actual response
+ if (answer instanceof JobManagerMessages.JobResultSuccess) {
+ LOG.info("Job execution complete");
+
+ SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
+ if (result != null) {
+ try {
+ return result.toJobExecutionResult(userCodeClassloader);
+ }
+ catch (Throwable t) {
+ throw new JobExecutionException(jobGraph.getJobID(),
+ "Job was successfully executed but JobExecutionResult could not be deserialized.");
+ }
+ }
+ else {
+ throw new JobExecutionException(jobGraph.getJobID(),
+ "Job was successfully executed but result contained a null JobExecutionResult.");
+ }
+ }
+ if (answer instanceof JobManagerMessages.JobResultFailure) {
+ LOG.info("Job execution failed");
+
+ SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure) answer).cause();
+ if (serThrowable != null) {
+ Throwable cause = serThrowable.deserializeError(userCodeClassloader);
+ if (cause instanceof JobExecutionException) {
+ throw (JobExecutionException) cause;
+ }
+ else {
+ throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed", cause);
+ }
+ }
+ else {
+ throw new JobExecutionException(jobGraph.getJobID(),
+ "Job execution failed with null as failure cause.");
+ }
+ }
+ else {
+ throw new JobExecutionException(jobGraph.getJobID(),
+ "Unknown answer from JobManager after submitting the job: " + answer);
+ }
}
/**
@@ -203,31 +226,20 @@ public class JobClient {
public static void submitJobDetached(
ActorGateway jobManagerGateway,
JobGraph jobGraph,
- FiniteDuration timeout) throws JobExecutionException {
-
- Preconditions.checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
- Preconditions.checkNotNull(jobGraph, "The jobGraph must not be null.");
- Preconditions.checkNotNull(timeout, "The timeout must not be null.");
-
- Future<Object> future = jobManagerGateway.ask(
- new JobManagerMessages.SubmitJob(jobGraph, false),
- timeout);
+ FiniteDuration timeout,
+ ClassLoader userCodeClassloader) throws JobExecutionException {
+ checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
+ checkNotNull(jobGraph, "The jobGraph must not be null.");
+ checkNotNull(timeout, "The timeout must not be null.");
+
+ Object result;
try {
- Object result = Await.result(future, timeout);
- if (result instanceof JobID) {
- JobID respondedID = (JobID) result;
- if (!respondedID.equals(jobGraph.getJobID())) {
- throw new Exception("JobManager responded for wrong Job. This Job: " +
- jobGraph.getJobID() + ", response: " + respondedID);
- }
- }
- else {
- throw new Exception("Unexpected response: " + result);
- }
- }
- catch (JobExecutionException e) {
- throw e;
+ Future<Object> future = jobManagerGateway.ask(
+ new JobManagerMessages.SubmitJob(jobGraph, false),
+ timeout);
+
+ result = Await.result(future, timeout);
}
catch (TimeoutException e) {
throw new JobTimeoutException(jobGraph.getJobID(),
@@ -237,6 +249,33 @@ public class JobClient {
throw new JobExecutionException(jobGraph.getJobID(),
"Failed to send job to JobManager: " + t.getMessage(), t.getCause());
}
+
+ if (result instanceof JobManagerMessages.JobSubmitSuccess) {
+ JobID respondedID = ((JobManagerMessages.JobSubmitSuccess) result).jobId();
+
+ // validate response
+ if (!respondedID.equals(jobGraph.getJobID())) {
+ throw new JobExecutionException(jobGraph.getJobID(),
+ "JobManager responded for wrong Job. This Job: " +
+ jobGraph.getJobID() + ", response: " + respondedID);
+ }
+ }
+ else if (result instanceof JobManagerMessages.JobResultFailure) {
+ try {
+ SerializedThrowable t = ((JobManagerMessages.JobResultFailure) result).cause();
+ throw t.deserializeError(userCodeClassloader);
+ }
+ catch (JobExecutionException e) {
+ throw e;
+ }
+ catch (Throwable t) {
+ throw new JobExecutionException(jobGraph.getJobID(),
+ "JobSubmission failed: " + t.getMessage(), t);
+ }
+ }
+ else {
+ throw new JobExecutionException(jobGraph.getJobID(), "Unexpected response from JobManager: " + result);
+ }
}
/**
@@ -249,13 +288,10 @@ public class JobClient {
* @param timeout Timeout for futures
* @throws IOException Thrown, if the file upload to the JobManager failed.
*/
- public static void uploadJarFiles(
- JobGraph jobGraph,
- ActorGateway jobManagerGateway,
- FiniteDuration timeout)
+ public static void uploadJarFiles(JobGraph jobGraph, ActorGateway jobManagerGateway, FiniteDuration timeout)
throws IOException {
+
if (jobGraph.hasUsercodeJarFiles()) {
-
Future<Object> futureBlobPort = jobManagerGateway.ask(
JobManagerMessages.getRequestBlobManagerPort(),
timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index b4bfc8f..16c6baf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -43,17 +43,15 @@ public class JobClientActor extends FlinkUntypedActor {
private final Logger logger;
private final boolean sysoutUpdates;
- // leader session ID of the JobManager when this actor was created
+ /** leader session ID of the JobManager when this actor was created */
private final Option<UUID> leaderSessionID;
- // Actor which submits a job to the JobManager via this actor
+ /** Actor which submits a job to the JobManager via this actor */
private ActorRef submitter;
- public JobClientActor(
- ActorRef jobManager,
- Logger logger,
- boolean sysoutUpdates,
- Option<UUID> leaderSessionID) {
+ public JobClientActor(ActorRef jobManager, Logger logger, boolean sysoutUpdates,
+ Option<UUID> leaderSessionID) {
+
this.jobManager = Preconditions.checkNotNull(jobManager, "The JobManager ActorRef must not be null.");
this.logger = Preconditions.checkNotNull(logger, "The logger must not be null.");
this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
@@ -113,9 +111,14 @@ public class JobClientActor extends FlinkUntypedActor {
}
// acknowledgement to submit job is only logged, our original
// submitter is only interested in the final job result
- else if (message instanceof JobManagerMessages.JobResultSuccess) {
+ else if (message instanceof JobManagerMessages.JobResultSuccess ||
+ message instanceof JobManagerMessages.JobResultFailure) {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received {} message from JobManager", message.getClass().getSimpleName());
+ }
+
// forward the success to the original job submitter
- logger.debug("Received JobResultSuccess message from JobManager");
if (this.submitter != null) {
this.submitter.tell(decorateMessage(message), getSelf());
}
@@ -124,17 +127,10 @@ public class JobClientActor extends FlinkUntypedActor {
getContext().unwatch(jobManager);
getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
}
- else if (message instanceof Status.Success) {
+ else if (message instanceof JobManagerMessages.JobSubmitSuccess) {
// job was successfully submitted :-)
logger.info("Job was successfully submitted to the JobManager");
}
- else if (message instanceof Status.Failure) {
- // job execution failed, inform the actor that submitted the job
- logger.debug("Received failure from JobManager", ((Status.Failure) message).cause());
- if (submitter != null) {
- submitter.tell(decorateMessage(message), sender());
- }
- }
// =========== Actor / Communication Failure ===============
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
index 56ccef5..7871a8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
@@ -29,7 +29,7 @@ public class JobExecutionException extends Exception {
private static final long serialVersionUID = 2818087325120827525L;
- private JobID jobID;
+ private final JobID jobID;
/**
* Constructs a new job execution exception.
@@ -37,21 +37,18 @@ public class JobExecutionException extends Exception {
* @param msg The cause for the execution exception.
* @param cause The cause of the exception
*/
- public JobExecutionException(final JobID jobID, final String msg, final Throwable cause) {
+ public JobExecutionException(JobID jobID, String msg, Throwable cause) {
super(msg, cause);
-
this.jobID = jobID;
}
- public JobExecutionException(final JobID jobID, final String msg) {
+ public JobExecutionException(JobID jobID, String msg) {
super(msg);
-
this.jobID = jobID;
}
- public JobExecutionException(final JobID jobID, final Throwable cause) {
+ public JobExecutionException(JobID jobID, Throwable cause) {
super(cause);
-
this.jobID = jobID;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c7191fa..189682b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,9 +46,12 @@ import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.ExceptionUtils;
+
import org.slf4j.Logger;
+
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -137,7 +140,7 @@ public class Execution implements Serializable {
private ExecutionContext executionContext;
/* Lock for updating the accumulators atomically. */
- private final Object accumulatorLock = new Object();
+ private final SerializableObject accumulatorLock = new SerializableObject();
/* Continuously updated map of user-defined accumulators */
private volatile Map<String, Accumulator<?, ?>> userAccumulators;
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9648a8f..3602372 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import akka.actor.ActorSystem;
+
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -45,8 +46,8 @@ import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.ExceptionUtils;
-
import org.apache.flink.util.InstantiationUtil;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -908,9 +909,11 @@ public class ExecutionGraph implements Serializable {
// --------------------------------------------------------------------------------------------
/**
- * Updates the state of the Task and sets the final accumulator results.
- * @param state
- * @return
+ * Updates the state of one of the ExecutionVertex's Execution attempts.
+ * If the new status if "FINISHED", this also updates the
+ *
+ * @param state The state update.
+ * @return True, if the task update was properly applied, false, if the execution attempt was not found.
*/
public boolean updateState(TaskExecutionState state) {
Execution attempt = this.currentExecutions.get(state.getID());
@@ -926,8 +929,9 @@ public class ExecutionGraph implements Serializable {
AccumulatorSnapshot accumulators = state.getAccumulators();
flinkAccumulators = accumulators.deserializeFlinkAccumulators();
userAccumulators = accumulators.deserializeUserAccumulators(userClassLoader);
- } catch (Exception e) {
- // Exceptions would be thrown in the future here
+ }
+ catch (Exception e) {
+ // we do not fail the job on deserialization problems of accumulators, but only log
LOG.error("Failed to deserialize final accumulator results.", e);
}
@@ -1029,12 +1033,9 @@ public class ExecutionGraph implements Serializable {
private void notifyJobStatusChange(JobStatus newState, Throwable error) {
if (jobStatusListenerActors.size() > 0) {
- SerializedThrowable serializedThrowable = null;
- if(error != null) {
- serializedThrowable = new SerializedThrowable(error);
- }
ExecutionGraphMessages.JobStatusChanged message =
- new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(), serializedThrowable);
+ new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(),
+ error == null ? null : new SerializedThrowable(error));
for (ActorGateway listener: jobStatusListenerActors) {
listener.tell(message);
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 07b7ee8..c7ea06e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -104,7 +104,7 @@ public class TaskExecutionState implements java.io.Serializable {
this.jobID = jobID;
this.executionId = executionId;
this.executionState = executionState;
- if(error != null) {
+ if (error != null) {
this.throwable = new SerializedThrowable(error);
} else {
this.throwable = null;
@@ -115,18 +115,18 @@ public class TaskExecutionState implements java.io.Serializable {
// --------------------------------------------------------------------------------------------
/**
- * Gets the attached exception. Requires to pass a classloader, because the
- * class of the exception may be user-defined and hence only accessible through
- * the user code classloader, not the default classloader.
- *
- * @param usercodeClassloader The class loader for the user code of the
- * job this update refers to.
+ * Gets the attached exception, which is in serialized form. Returns null,
+ * if the status update is no failure with an associated exception.
+ *
+ * @param userCodeClassloader The classloader that can resolve user-defined exceptions.
+ * @return The attached exception, or null, if none.
*/
- public Throwable getError(ClassLoader usercodeClassloader) {
+ public Throwable getError(ClassLoader userCodeClassloader) {
if (this.throwable == null) {
return null;
- } else {
- return throwable.deserializeError(usercodeClassloader);
+ }
+ else {
+ return this.throwable.deserializeError(userCodeClassloader);
}
}
@@ -173,8 +173,7 @@ public class TaskExecutionState implements java.io.Serializable {
return other.jobID.equals(this.jobID) &&
other.executionId.equals(this.executionId) &&
other.executionState == this.executionState &&
- (other.throwable == null ? this.throwable == null :
- (this.throwable != null && throwable.equals(other.throwable) ));
+ (other.throwable == null) == (this.throwable == null);
}
else {
return false;
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
index 6e5a558..59178c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
@@ -15,101 +15,164 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.util;
-import com.google.common.base.Preconditions;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
import java.io.Serializable;
-import java.util.Arrays;
+import java.lang.ref.WeakReference;
/**
- * Utility class for dealing with serialized Throwables.
- * Needed to send around user-specific exception classes with Akka.
+ * Utility class for dealing with user-defined Throwable types that are serialized (for
+ * example during RPC/Actor communication), but cannot be resolved with the default
+ * class loader.
+ * <p>
+ * This exception mimics the original exception with respect to message and stack trace,
+ * and contains the original exception in serialized form. The original exception
+ * can be re-obtained by supplying the appropriate class loader.
*/
public class SerializedThrowable extends Exception implements Serializable {
+
private static final long serialVersionUID = 7284183123441947635L;
- private final byte[] serializedError;
+
+ /** The original exception in serialized form */
+ private final byte[] serializedException;
+
+ /** Name of the original error class */
+ private final String originalErrorClassName;
+
+ /** The original stack trace, to be printed */
+ private final String fullStingifiedStackTrace;
- // The exception must not be (de)serialized with the class, as its
- // class may not be part of the system class loader.
- private transient Throwable cachedError;
+ /** A guaranteed serializable placeholder exception that will be used as
+ * cause and to capture the original stack trace */
+ private final Exception placeholder;
+
+ /** The original exception, not transported via serialization,
+ * because the class may not be part of the system class loader.
+ * In addition, we make sure our cached references to not prevent
+ * unloading the exception class. */
+ private transient WeakReference<Throwable> cachedException;
/**
* Create a new SerializedThrowable.
- * @param error The exception to serialize.
+ *
+ * @param exception The exception to serialize.
*/
- public SerializedThrowable(Throwable error) {
- Preconditions.checkNotNull(error, "The exception to serialize has to be set");
- this.cachedError = error;
- byte[] serializedError;
- try {
- serializedError = InstantiationUtil.serializeObject(error);
- }
- catch (Throwable t) {
- // could not serialize exception. send the stringified version instead
+ public SerializedThrowable(Throwable exception) {
+ super(getMessageOrError(exception));
+
+ if (!(exception instanceof SerializedThrowable)) {
+ this.cachedException = new WeakReference<Throwable>(exception);
+
+ this.originalErrorClassName = exception.getClass().getName();
+ this.fullStingifiedStackTrace = ExceptionUtils.stringifyException(exception);
+ this.placeholder = new Exception(
+ "Serialized representation of " + originalErrorClassName + ": " + getMessage());
+ this.placeholder.setStackTrace(exception.getStackTrace());
+ initCause(this.placeholder);
+
+ byte[] serialized;
try {
- this.cachedError = new Exception(ExceptionUtils.stringifyException(error));
- serializedError = InstantiationUtil.serializeObject(this.cachedError);
+ serialized = InstantiationUtil.serializeObject(exception);
}
- catch (Throwable tt) {
- // seems like we cannot do much to report the actual exception
- // report a placeholder instead
+ catch (Throwable t) {
+ // could not serialize exception. send the stringified version instead
try {
- this.cachedError = new Exception("Cause is a '" + error.getClass().getName()
- + "' (failed to serialize or stringify)");
- serializedError = InstantiationUtil.serializeObject(this.cachedError);
+ serialized = InstantiationUtil.serializeObject(placeholder);
}
- catch (Throwable ttt) {
- // this should never happen unless the JVM is fubar.
- // we just report the state without the error
- this.cachedError = null;
- serializedError = null;
+ catch (IOException e) {
+ // this should really never happen, as we only serialize a a standard exception
+ throw new RuntimeException(e.getMessage(), e);
}
}
+ this.serializedException = serialized;
+ }
+ else {
+ // copy from that serialized throwable
+ SerializedThrowable other = (SerializedThrowable) exception;
+ this.serializedException = other.serializedException;
+ this.originalErrorClassName = other.originalErrorClassName;
+ this.fullStingifiedStackTrace = other.fullStingifiedStackTrace;
+ this.placeholder = other.placeholder;
+ this.cachedException = other.cachedException;
}
- this.serializedError = serializedError;
}
public Throwable deserializeError(ClassLoader userCodeClassloader) {
- if (this.cachedError == null) {
+ Throwable cached = cachedException == null ? null : cachedException.get();
+ if (cached == null) {
try {
- cachedError = (Throwable) InstantiationUtil.deserializeObject(this.serializedError, userCodeClassloader);
+ cached = (Throwable) InstantiationUtil.deserializeObject(serializedException, userCodeClassloader);
+ cachedException = new WeakReference<Throwable>(cached);
}
catch (Exception e) {
- throw new RuntimeException("Error while deserializing the attached exception", e);
+ return placeholder;
}
}
- return this.cachedError;
+ return cached;
}
+
+ public String getStrigifiedStackTrace() {
+ return fullStingifiedStackTrace;
+ }
+
+ // ------------------------------------------------------------------------
+ // Override the behavior of Throwable
+ // ------------------------------------------------------------------------
@Override
- public boolean equals(Object obj) {
- if(obj instanceof SerializedThrowable) {
- return Arrays.equals(this.serializedError, ((SerializedThrowable)obj).serializedError);
- }
- return false;
+ public Throwable getCause() {
+ return placeholder;
}
@Override
+ public void printStackTrace(PrintStream s) {
+ s.print(fullStingifiedStackTrace);
+ s.flush();
+ }
+
+ @Override
+ public void printStackTrace(PrintWriter s) {
+ s.print(fullStingifiedStackTrace);
+ s.flush();
+ }
+
+ @Override
public String toString() {
- if(cachedError != null) {
- return cachedError.getClass().getName() + ": " + cachedError.getMessage();
- }
- if(serializedError == null) {
- return "(null)"; // can not happen as per Ctor check.
- } else {
- return "(serialized)";
- }
+ String message = getLocalizedMessage();
+ return (message != null) ? (originalErrorClassName + ": " + message) : originalErrorClassName;
+ }
+
+ @Override
+ public StackTraceElement[] getStackTrace() {
+ return placeholder.getStackTrace();
}
+ // ------------------------------------------------------------------------
+ // Static utilities
+ // ------------------------------------------------------------------------
+
public static Throwable get(Throwable serThrowable, ClassLoader loader) {
- if(serThrowable instanceof SerializedThrowable) {
+ if (serThrowable instanceof SerializedThrowable) {
return ((SerializedThrowable)serThrowable).deserializeError(loader);
} else {
return serThrowable;
}
}
+
+ private static String getMessageOrError(Throwable error) {
+ try {
+ return error.getMessage();
+ }
+ catch (Throwable t) {
+ return "(failed to get message)";
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a67c23a..e9a31fb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -23,7 +23,7 @@ import java.lang.reflect.{InvocationTargetException, Constructor}
import java.net.InetSocketAddress
import java.util.{UUID, Collections}
-import akka.actor.Status.{Failure, Success}
+import akka.actor.Status.Failure
import akka.actor._
import grizzled.slf4j.Logger
@@ -326,21 +326,18 @@ class JobManager(
currentJobs.get(jobID) match {
case Some((executionGraph, jobInfo)) => executionGraph.getJobName
- val deserializedError = if(error != null) {
- error.deserializeError(executionGraph.getUserClassLoader)
- } else null
log.info(
s"Status of job $jobID (${executionGraph.getJobName}) changed to $newJobStatus.",
- deserializedError)
+ error)
- if (newJobStatus.isTerminalState) {
+ if (newJobStatus.isTerminalState()) {
jobInfo.end = timeStamp
// is the client waiting for the job result?
newJobStatus match {
case JobStatus.FINISHED =>
val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
- executionGraph.getAccumulatorsSerialized
+ executionGraph.getAccumulatorsSerialized()
} catch {
case e: Exception =>
log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
@@ -353,24 +350,26 @@ class JobManager(
jobInfo.client ! decorateMessage(JobResultSuccess(result))
case JobStatus.CANCELED =>
- jobInfo.client ! decorateMessage(
- Failure(
- new JobCancellationException(
- jobID,
- "Job was cancelled.",
- new SerializedThrowable(deserializedError))))
+ // the error may be packed as a serialized throwable
+ val unpackedError = SerializedThrowable.get(
+ error, executionGraph.getUserClassLoader())
+
+ jobInfo.client ! decorateMessage(JobResultFailure(
+ new SerializedThrowable(
+ new JobCancellationException(jobID, "Job was cancelled.", unpackedError))))
case JobStatus.FAILED =>
- jobInfo.client ! decorateMessage(
- Failure(
- new JobExecutionException(
- jobID,
- "Job execution failed.",
- new SerializedThrowable(deserializedError))))
+ val unpackedError = SerializedThrowable.get(
+ error, executionGraph.getUserClassLoader())
+
+ jobInfo.client ! decorateMessage(JobResultFailure(
+ new SerializedThrowable(
+ new JobExecutionException(jobID, "Job execution failed.", unpackedError))))
case x =>
val exception = new JobExecutionException(jobID, s"$x is not a terminal state.")
- jobInfo.client ! decorateMessage(Failure(new SerializedThrowable(exception)))
+ jobInfo.client ! decorateMessage(JobResultFailure(
+ new SerializedThrowable(exception)))
throw exception
}
@@ -525,11 +524,11 @@ class JobManager(
*/
private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean): Unit = {
if (jobGraph == null) {
- sender ! decorateMessage(
- Failure(
+ sender() ! decorateMessage(JobResultFailure(
+ new SerializedThrowable(
new JobSubmissionException(null, "JobGraph must not be null.")
)
- )
+ ))
}
else {
val jobId = jobGraph.getJobID
@@ -568,17 +567,17 @@ class JobManager(
executionContext,
jobGraph.getJobID,
jobGraph.getName,
- jobGraph.getJobConfiguration,
+ jobGraph.getJobConfiguration(),
timeout,
- jobGraph.getUserJarBlobKeys,
+ jobGraph.getUserJarBlobKeys(),
userCodeLoader),
JobInfo(sender(), System.currentTimeMillis())
)
)._1
// configure the execution graph
- val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) {
- jobGraph.getNumberOfExecutionRetries
+ val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries() >= 0) {
+ jobGraph.getNumberOfExecutionRetries()
} else {
defaultExecutionRetries
}
@@ -612,8 +611,9 @@ class JobManager(
vertex.initializeOnMaster(userCodeLoader)
}
catch {
- case t: Throwable => throw new JobExecutionException(jobId,
- "Cannot initialize task '" + vertex.getName + "': " + t.getMessage, t)
+ case t: Throwable =>
+ throw new JobExecutionException(jobId,
+ "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage, t)
}
}
@@ -643,13 +643,13 @@ class JobManager(
}
val triggerVertices: java.util.List[ExecutionJobVertex] =
- snapshotSettings.getVerticesToTrigger.asScala.map(idToVertex).asJava
+ snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava
val ackVertices: java.util.List[ExecutionJobVertex] =
- snapshotSettings.getVerticesToAcknowledge.asScala.map(idToVertex).asJava
+ snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava
val confirmVertices: java.util.List[ExecutionJobVertex] =
- snapshotSettings.getVerticesToConfirm.asScala.map(idToVertex).asJava
+ snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava
executionGraph.enableSnapshotCheckpointing(
snapshotSettings.getCheckpointInterval,
@@ -673,7 +673,7 @@ class JobManager(
}
// done with submitting the job
- sender ! decorateMessage(Success(jobGraph.getJobID))
+ sender() ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
}
catch {
case t: Throwable =>
@@ -692,7 +692,7 @@ class JobManager(
new JobExecutionException(jobId, s"Failed to submit job ${jobId} (${jobName})", t)
}
- sender ! decorateMessage(Failure(rt))
+ sender() ! decorateMessage(JobResultFailure(new SerializedThrowable(rt)))
return
}
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
index ce6fdf3..83bafaa 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
@@ -25,13 +25,16 @@ import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID}
-import org.apache.flink.runtime.util.SerializedThrowable
/**
* This object contains the execution graph specific messages.
*/
object ExecutionGraphMessages {
+ // --------------------------------------------------------------------------
+ // Messages
+ // --------------------------------------------------------------------------
+
/**
* Denotes the execution state change of an
* [[org.apache.flink.runtime.executiongraph.ExecutionVertex]]
@@ -64,6 +67,7 @@ object ExecutionGraphMessages {
} else {
""
}
+
s"${timestampToString(timestamp)}\t$taskName(${subtaskIndex +
1}/$totalNumberOfSubTasks) switched to $newExecutionState $oMsg"
}
@@ -75,23 +79,29 @@ object ExecutionGraphMessages {
* @param jobID identifying the corresponding job
* @param newJobStatus
* @param timestamp
- * @param serializedError
+ * @param error
*/
case class JobStatusChanged(
jobID: JobID,
newJobStatus: JobStatus,
timestamp: Long,
- serializedError: SerializedThrowable)
+ error: Throwable)
extends RequiresLeaderSessionID {
+
override def toString: String = {
s"${timestampToString(timestamp)}\tJob execution switched to status $newJobStatus."
}
}
+ // --------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------
+
private val DATE_FORMATTER: SimpleDateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss")
private def timestampToString(timestamp: Long): String = {
- DATE_FORMATTER.format(new Date(timestamp))
+ DATE_FORMATTER.synchronized {
+ DATE_FORMATTER.format(new Date(timestamp))
+ }
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index e38986b..1c250af 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGra
import org.apache.flink.runtime.instance.{InstanceID, Instance}
import org.apache.flink.runtime.io.network.partition.ResultPartitionID
import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.util.SerializedThrowable
import scala.collection.JavaConverters._
@@ -163,10 +164,23 @@ object JobManagerMessages {
case class ResponseLeaderSessionID(leaderSessionID: Option[UUID])
/**
+ * Denotes a successful job submission.
+ * @param jobId Ths job's ID.
+ */
+ case class JobSubmitSuccess(jobId: JobID)
+
+ /**
* Denotes a successful job execution.
+ * @param result The result of the job execution, in serialized form.
*/
case class JobResultSuccess(result: SerializedJobExecutionResult)
+ /**
+ * Denotes an unsuccessful job execution.
+ * @param cause The exception that caused the job to fail, in serialized form.
+ */
+ case class JobResultFailure(cause: SerializedThrowable)
+
sealed trait CancellationResponse{
def jobID: JobID
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 5925c96..c4c35f8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -19,23 +19,25 @@
package org.apache.flink.runtime.minicluster
import java.net.InetAddress
-import akka.pattern.Patterns.gracefulStop
+import akka.pattern.Patterns.gracefulStop
import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem}
+
import com.typesafe.config.Config
+
import org.apache.flink.api.common.{JobExecutionResult, JobSubmissionResult}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.{JobExecutionException, JobClient,
-SerializedJobExecutionResult}
+import org.apache.flink.runtime.client.{JobExecutionException, JobClient}
import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
import org.apache.flink.runtime.jobgraph.JobGraph
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
import org.apache.flink.runtime.webmonitor.WebMonitor
+
import org.slf4j.LoggerFactory
import scala.concurrent.duration.FiniteDuration
@@ -258,12 +260,16 @@ abstract class FlinkMiniCluster(
jobGraph,
timeout,
printUpdates,
- this.getClass.getClassLoader)
+ this.getClass().getClassLoader())
}
@throws(classOf[JobExecutionException])
def submitJobDetached(jobGraph: JobGraph) : JobSubmissionResult = {
- JobClient.submitJobDetached(getJobManagerGateway(), jobGraph, timeout)
+ JobClient.submitJobDetached(
+ getJobManagerGateway(),
+ jobGraph,
+ timeout,
+ getClass().getClassLoader())
new JobSubmissionResult(jobGraph.getJobID)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 8b01f06..217f46e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -19,9 +19,10 @@
package org.apache.flink.runtime.jobmanager;
import akka.actor.ActorSystem;
-import akka.actor.Status;
import akka.testkit.JavaTestKit;
+
import com.typesafe.config.Config;
+
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -37,6 +38,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
@@ -46,9 +48,11 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+
import scala.Option;
import scala.Some;
import scala.Tuple2;
@@ -124,7 +128,7 @@ public class JobManagerTest {
// Submit the job and wait for all vertices to be running
jobManagerGateway.tell(new SubmitJob(jobGraph, false), testActorGateway);
- expectMsgClass(Status.Success.class);
+ expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
jobManagerGateway.tell(
new WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID()),
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
index 52154e1..1d0b3b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
@@ -109,7 +109,8 @@ public class TaskCancelTest {
// Run test
JobClient.submitJobDetached(
- flink.getJobManagerGateway(), jobGraph, TestingUtils.TESTING_DURATION());
+ flink.getJobManagerGateway(), jobGraph,
+ TestingUtils.TESTING_DURATION(), getClass().getClassLoader());
// Wait for the job to make some progress and then cancel
awaitRunning(
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
index f4c7a57..b3f2456 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.testutils.CommonTestUtils;
+
import org.junit.Test;
/**
@@ -88,7 +89,7 @@ public class TaskExecutionStateTest {
}
@Test
- public void hanleNonSerializableException() {
+ public void handleNonSerializableException() {
try {
@SuppressWarnings({"ThrowableInstanceNeverThrown", "serial"})
Exception hostile = new Exception() {
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
new file mode 100644
index 0000000..3dca362
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.CodeSource;
+import java.security.Permissions;
+import java.security.ProtectionDomain;
+import java.security.cert.Certificate;
+
+import static org.junit.Assert.*;
+
+public class SerializedThrowableTest {
+
+ @Test
+ public void testIdenticalMessageAndStack() {
+ try {
+ IllegalArgumentException original = new IllegalArgumentException("test message");
+ SerializedThrowable serialized = new SerializedThrowable(original);
+
+ assertEquals(original.getMessage(), serialized.getMessage());
+ assertEquals(original.toString(), serialized.toString());
+
+ assertEquals(ExceptionUtils.stringifyException(original),
+ ExceptionUtils.stringifyException(serialized));
+
+ assertArrayEquals(original.getStackTrace(), serialized.getStackTrace());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSerialization() {
+ try {
+ // We need an exception whose class is not in the core class loader
+ // we solve that by defining an exception class dynamically
+
+ // an exception class, as bytes
+ final byte[] classData = {
+ -54, -2, -70, -66, 0, 0, 0, 51, 0, 21, 10, 0, 3, 0, 18, 7, 0, 19, 7, 0, 20, 1,
+ 0, 16, 115, 101, 114, 105, 97, 108, 86, 101, 114, 115, 105, 111, 110, 85, 73,
+ 68, 1, 0, 1, 74, 1, 0, 13, 67, 111, 110, 115, 116, 97, 110, 116, 86, 97, 108,
+ 117, 101, 5, -103, -52, 22, -41, -23, -36, -25, 47, 1, 0, 6, 60, 105, 110, 105,
+ 116, 62, 1, 0, 3, 40, 41, 86, 1, 0, 4, 67, 111, 100, 101, 1, 0, 15, 76, 105,
+ 110, 101, 78, 117, 109, 98, 101, 114, 84, 97, 98, 108, 101, 1, 0, 18, 76, 111,
+ 99, 97, 108, 86, 97, 114, 105, 97, 98, 108, 101, 84, 97, 98, 108, 101, 1, 0, 4,
+ 116, 104, 105, 115, 1, 0, 61, 76, 111, 114, 103, 47, 97, 112, 97, 99, 104, 101,
+ 47, 102, 108, 105, 110, 107, 47, 114, 117, 110, 116, 105, 109, 101, 47, 117,
+ 116, 105, 108, 47, 84, 101, 115, 116, 69, 120, 99, 101, 112, 116, 105, 111,
+ 110, 70, 111, 114, 83, 101, 114, 105, 97, 108, 105, 122, 97, 116, 105, 111,
+ 110, 59, 1, 0, 10, 83, 111, 117, 114, 99, 101, 70, 105, 108, 101, 1, 0, 34, 84,
+ 101, 115, 116, 69, 120, 99, 101, 112, 116, 105, 111, 110, 70, 111, 114, 83,
+ 101, 114, 105, 97, 108, 105, 122, 97, 116, 105, 111, 110, 46, 106, 97, 118, 97,
+ 12, 0, 9, 0, 10, 1, 0, 59, 111, 114, 103, 47, 97, 112, 97, 99, 104, 101, 47,
+ 102, 108, 105, 110, 107, 47, 114, 117, 110, 116, 105, 109, 101, 47, 117, 116,
+ 105, 108, 47, 84, 101, 115, 116, 69, 120, 99, 101, 112, 116, 105, 111, 110, 70,
+ 111, 114, 83, 101, 114, 105, 97, 108, 105, 122, 97, 116, 105, 111, 110, 1, 0,
+ 19, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 69, 120, 99, 101, 112, 116,
+ 105, 111, 110, 0, 33, 0, 2, 0, 3, 0, 0, 0, 1, 0, 26, 0, 4, 0, 5, 0, 1, 0, 6, 0,
+ 0, 0, 2, 0, 7, 0, 1, 0, 1, 0, 9, 0, 10, 0, 1, 0, 11, 0, 0, 0, 47, 0, 1, 0, 1, 0,
+ 0, 0, 5, 42, -73, 0, 1, -79, 0, 0, 0, 2, 0, 12, 0, 0, 0, 6, 0, 1, 0, 0, 0, 21,
+ 0, 13, 0, 0, 0, 12, 0, 1, 0, 0, 0, 5, 0, 14, 0, 15, 0, 0, 0, 1, 0, 16, 0, 0, 0,
+ 2, 0, 17};
+
+ // dummy class loader that has no access to any classes
+ ClassLoader loader = new URLClassLoader(new URL[0]);
+
+ // define a class into the classloader
+ Class<?> clazz = MemoryUtils.UNSAFE.defineClass(
+ "org.apache.flink.runtime.util.TestExceptionForSerialization",
+ classData, 0, classData.length,
+ loader,
+ new ProtectionDomain(new CodeSource(null, (Certificate[]) null), new Permissions()));
+
+ // create an instance of the exception (no message, no cause)
+ Exception userException = clazz.asSubclass(Exception.class).newInstance();
+
+ // check that we cannot simply copy the exception
+ try {
+ byte[] serialized = InstantiationUtil.serializeObject(userException);
+ InstantiationUtil.deserializeObject(serialized, getClass().getClassLoader());
+ fail("should fail with a class not found exception");
+ }
+ catch (ClassNotFoundException e) {
+ // as we want it
+ }
+
+ // validate that the SerializedThrowable mimics the original exception
+ SerializedThrowable serialized = new SerializedThrowable(userException);
+ assertEquals(userException.getMessage(), serialized.getMessage());
+ assertEquals(userException.toString(), serialized.toString());
+ assertEquals(ExceptionUtils.stringifyException(userException),
+ ExceptionUtils.stringifyException(serialized));
+ assertArrayEquals(userException.getStackTrace(), serialized.getStackTrace());
+
+ // copy the serialized throwable and make sure everything still works
+ SerializedThrowable copy = CommonTestUtils.createCopySerializable(serialized);
+ assertEquals(userException.getMessage(), copy.getMessage());
+ assertEquals(userException.toString(), copy.toString());
+ assertEquals(ExceptionUtils.stringifyException(userException),
+ ExceptionUtils.stringifyException(copy));
+ assertArrayEquals(userException.getStackTrace(), copy.getStackTrace());
+
+ // deserialize the proper exception
+ Throwable deserialized = copy.deserializeError(loader);
+ assertEquals(clazz, deserialized.getClass());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
index 718496c..94ead78 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
@@ -24,7 +24,7 @@ import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender}
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob}
import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@@ -68,12 +68,12 @@ class CoLocationConstraintITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Pointwise job", sender, receiver)
val cluster = TestingUtils.startTestingCluster(num_tasks)
- val gateway = cluster.getJobManagerGateway
+ val gateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
gateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
expectMsgType[JobResultSuccess]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 3238dd5..ddca3e2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -66,10 +66,10 @@ class JobManagerITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Test Job", vertex)
val cluster = TestingUtils.startTestingCluster(1)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
- val response = (jmGateway.ask(RequestTotalNumberOfSlots, timeout.duration)).mapTo[Int]
+ val response = jmGateway.ask(RequestTotalNumberOfSlots, timeout.duration).mapTo[Int]
val availableSlots = Await.result(response, duration)
@@ -77,21 +77,16 @@ class JobManagerITCase(_system: ActorSystem)
within(2 second) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
-
- val success = expectMsgType[Success]
-
- jobGraph.getJobID should equal(success.status)
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID()))
}
within(2 second) {
- val response = expectMsgType[Failure]
- val exception = SerializedThrowable.get(response.cause, this.getClass.getClassLoader)
+ val response = expectMsgType[JobResultFailure]
+ val exception = response.cause.deserializeError(getClass.getClassLoader())
exception match {
case e: JobExecutionException =>
jobGraph.getJobID should equal(e.getJobID)
- val cause = e.getCause.asInstanceOf[SerializedThrowable].deserializeError(
- this.getClass.getClassLoader)
- new NoResourceAvailableException(1,1,0) should equal(cause)
+ new NoResourceAvailableException(1,1,0) should equal(e.getCause())
case e => fail(s"Received wrong exception of type $e.")
}
}
@@ -113,10 +108,10 @@ class JobManagerITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Test Job", vertex)
val cluster = TestingUtils.startTestingCluster(num_tasks)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
- val response = (jmGateway.ask(RequestTotalNumberOfSlots, timeout.duration)).mapTo[Int]
+ val response = jmGateway.ask(RequestTotalNumberOfSlots, timeout.duration).mapTo[Int]
val availableSlots = Await.result(response, duration)
@@ -125,9 +120,9 @@ class JobManagerITCase(_system: ActorSystem)
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
+
val result = expectMsgType[JobResultSuccess]
-
result.result.getJobId() should equal(jobGraph.getJobID)
}
@@ -149,13 +144,13 @@ class JobManagerITCase(_system: ActorSystem)
jobGraph.setAllowQueuedScheduling(true)
val cluster = TestingUtils.startTestingCluster(10)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
val result = expectMsgType[JobResultSuccess]
@@ -184,13 +179,13 @@ class JobManagerITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
val result = expectMsgType[JobResultSuccess]
@@ -219,13 +214,13 @@ class JobManagerITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Bipartite Job", sender, receiver)
val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
expectMsgType[JobResultSuccess]
}
@@ -256,15 +251,15 @@ class JobManagerITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
- val failure = expectMsgType[Failure]
- val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
+ val failure = expectMsgType[JobResultFailure]
+ val exception = failure.cause.deserializeError(getClass.getClassLoader())
exception match {
case e: JobExecutionException =>
@@ -301,12 +296,12 @@ class JobManagerITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
val cluster = TestingUtils.startTestingCluster(6 * num_tasks)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
expectMsgType[JobResultSuccess]
}
@@ -345,13 +340,13 @@ class JobManagerITCase(_system: ActorSystem)
jobGraph.setScheduleMode(ScheduleMode.ALL)
val cluster = TestingUtils.startTestingCluster(num_tasks, 1)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
expectMsgType[JobResultSuccess]
@@ -379,7 +374,7 @@ class JobManagerITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
val cluster = TestingUtils.startTestingCluster(num_tasks)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
@@ -389,10 +384,10 @@ class JobManagerITCase(_system: ActorSystem)
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
- val failure = expectMsgType[Failure]
- val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+ val failure = expectMsgType[JobResultFailure]
+ val exception = failure.cause.deserializeError(getClass.getClassLoader())
exception match {
case e: JobExecutionException =>
jobGraph.getJobID should equal(e.getJobID)
@@ -427,7 +422,7 @@ class JobManagerITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
val cluster = TestingUtils.startTestingCluster(num_tasks)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
@@ -437,10 +432,10 @@ class JobManagerITCase(_system: ActorSystem)
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
- val failure = expectMsgType[Failure]
- val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+ val failure = expectMsgType[JobResultFailure]
+ val exception = failure.cause.deserializeError(getClass.getClassLoader())
exception match {
case e: JobExecutionException =>
jobGraph.getJobID should equal(e.getJobID)
@@ -472,15 +467,15 @@ class JobManagerITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Pointwise job", sender, receiver)
val cluster = TestingUtils.startTestingCluster(2 * num_tasks)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
- val failure = expectMsgType[Failure]
- val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+ val failure = expectMsgType[JobResultFailure]
+ val exception = failure.cause.deserializeError(getClass.getClassLoader())
exception match {
case e: JobExecutionException =>
jobGraph.getJobID should equal(e.getJobID)
@@ -512,7 +507,7 @@ class JobManagerITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Pointwise job", sender, receiver)
val cluster = TestingUtils.startTestingCluster(num_tasks)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
@@ -520,10 +515,10 @@ class JobManagerITCase(_system: ActorSystem)
expectMsg(num_tasks)
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
- val failure = expectMsgType[Failure]
- val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+ val failure = expectMsgType[JobResultFailure]
+ val exception = failure.cause.deserializeError(getClass.getClassLoader())
exception match {
case e: JobExecutionException =>
jobGraph.getJobID should equal(e.getJobID)
@@ -560,7 +555,7 @@ class JobManagerITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Pointwise job", sender, receiver)
val cluster = TestingUtils.startTestingCluster(num_tasks)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
@@ -568,10 +563,10 @@ class JobManagerITCase(_system: ActorSystem)
expectMsg(num_tasks)
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
- val failure = expectMsgType[Failure]
- val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+ val failure = expectMsgType[JobResultFailure]
+ val exception = failure.cause.deserializeError(getClass.getClassLoader())
exception match {
case e: JobExecutionException =>
jobGraph.getJobID should equal(e.getJobID)
@@ -603,13 +598,13 @@ class JobManagerITCase(_system: ActorSystem)
val jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", source, sink)
val cluster = TestingUtils.startTestingCluster(2*num_tasks)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try{
within(TestingUtils.TESTING_DURATION){
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
expectMsgType[JobResultSuccess]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 0c0e28a..d454e69 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -18,14 +18,13 @@
package org.apache.flink.runtime.jobmanager
-import akka.actor.Status.Success
import akka.actor.{PoisonPill, ActorSystem}
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, FailingOnceReceiver}
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{ JobResultSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob}
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
import org.junit.runner.RunWith
@@ -79,13 +78,13 @@ class RecoveryITCase(_system: ActorSystem)
jobGraph.setNumberOfExecutionRetries(1)
val cluster = startTestClusterWithHeartbeatTimeout(2 * NUM_TASKS, 1, "2 s")
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION){
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
val result = expectMsgType[JobResultSuccess]
@@ -122,13 +121,13 @@ class RecoveryITCase(_system: ActorSystem)
jobGraph.setNumberOfExecutionRetries(1)
val cluster = startTestClusterWithHeartbeatTimeout(NUM_TASKS, 1, "2 s")
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION){
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
val result = expectMsgType[JobResultSuccess]
@@ -166,13 +165,13 @@ class RecoveryITCase(_system: ActorSystem)
val cluster = startTestClusterWithHeartbeatTimeout(NUM_TASKS, 2, "2 s")
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION){
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID), self)
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index 406b054..23e8ac1 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -24,7 +24,7 @@ import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
import org.apache.flink.runtime.jobmanager.Tasks.{Sender, AgnosticBinaryReceiver, Receiver}
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobSubmitSuccess, JobResultSuccess, SubmitJob}
import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@@ -66,12 +66,12 @@ class SlotSharingITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
val cluster = TestingUtils.startTestingCluster(num_tasks)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
expectMsgType[JobResultSuccess]
}
@@ -110,12 +110,12 @@ class SlotSharingITCase(_system: ActorSystem)
val jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver)
val cluster = TestingUtils.startTestingCluster(num_tasks)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
try {
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
expectMsgType[JobResultSuccess]
}
} finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/13a95961/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index 7017c33..ae51da2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -18,14 +18,13 @@
package org.apache.flink.runtime.jobmanager
-import akka.actor.Status.{Failure, Success}
import akka.actor.{Kill, ActorSystem, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph}
import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailure, JobSubmitSuccess, SubmitJob}
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
import org.apache.flink.runtime.util.SerializedThrowable
@@ -70,13 +69,13 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
val jobID = jobGraph.getJobID
val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
val taskManagers = cluster.getTaskManagers
try{
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self)
@@ -85,8 +84,8 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
//kill task manager
taskManagers(0) ! PoisonPill
- val failure = expectMsgType[Failure]
- val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+ val failure = expectMsgType[JobResultFailure]
+ val exception = failure.cause.deserializeError(getClass.getClassLoader())
exception match {
case e: JobExecutionException =>
jobGraph.getJobID should equal(e.getJobID)
@@ -119,13 +118,13 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
val jobID = jobGraph.getJobID
val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2)
- val jmGateway = cluster.getJobManagerGateway
+ val jmGateway = cluster.getJobManagerGateway()
val taskManagers = cluster.getTaskManagers
try{
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, false), self)
- expectMsg(Success(jobGraph.getJobID))
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self)
expectMsg(AllVerticesRunning(jobID))
@@ -133,8 +132,8 @@ class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem)
//kill task manager
taskManagers(0) ! Kill
- val failure = expectMsgType[Failure]
- val exception = SerializedThrowable.get(failure.cause, this.getClass.getClassLoader)
+ val failure = expectMsgType[JobResultFailure]
+ val exception = failure.cause.deserializeError(getClass.getClassLoader())
exception match {
case e: JobExecutionException =>
jobGraph.getJobID should equal(e.getJobID)