You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/25 18:49:12 UTC

[69/89] [abbrv] flink git commit: [FLINK-4273] Modify JobClient to attach to running jobs

[FLINK-4273] Modify JobClient to attach to running jobs

These changes are required for FLINK-4272 (introduce a JobClient class
for job control). Essentially, we want to be able to re-attach to a
running job and monitor it. It shouldn't make any difference whether we
just submitted the job or we recover it from an existing JobID.

This PR modifies the JobClientActor to support two different operation
modes: a) submitJob and monitor b) re-attach to job and monitor

The JobClient class has been updated with methods to access this
functionality. Before the class just had `submitJobAndWait` and
`submitJobDetached`. Now, it has the additional methods `submitJob`,
`attachToRunningJob`, and `awaitJobResult`.

The job submission has been split up in two phases:

1a. submitJob(..)
Submit job and return a future which can be completed to
get the result with `awaitJobResult`

1b. attachToRunningJob(..)
Re-attach to a runnning job, reconstruct its class loader, and return a
future which can be completed with `awaitJobResult`

2. awaitJobResult(..)
Blocks until the returned future from either `submitJob` or
`attachToRunningJob` has been completed

- split up JobClientActor into a base class and two implementations
- JobClient: on waiting check JobClientActor liveness
- lazily reconstruct user class loader
- add additional tests for JobClientActor
- add test case to test resuming of jobs

This closes #2313


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/259a3a55
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/259a3a55
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/259a3a55

Branch: refs/heads/flip-6
Commit: 259a3a5569952458140afc8e9ad96eac0c330162
Parents: 444315a
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Aug 18 16:04:35 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Aug 25 15:46:15 2016 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     |  41 ++-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../flink/api/common/JobExecutionResult.java    |   4 +-
 .../client/JobAttachmentClientActor.java        | 171 +++++++++++
 .../apache/flink/runtime/client/JobClient.java  | 292 +++++++++++++++----
 .../flink/runtime/client/JobClientActor.java    | 281 ++++++------------
 ...ClientActorRegistrationTimeoutException.java |  35 +++
 .../runtime/client/JobListeningContext.java     | 145 +++++++++
 .../runtime/client/JobRetrievalException.java   |  42 +++
 .../client/JobSubmissionClientActor.java        | 192 ++++++++++++
 .../runtime/executiongraph/ExecutionGraph.java  |   1 +
 .../flink/runtime/jobmanager/JobInfo.scala      |  62 +++-
 .../flink/runtime/jobmanager/JobManager.scala   | 161 +++++-----
 .../runtime/messages/JobClientMessages.scala    |  23 +-
 .../runtime/messages/JobManagerMessages.scala   |  48 ++-
 .../testingUtils/TestingJobManagerLike.scala    |  12 +-
 .../TestingJobManagerMessages.scala             |   6 +
 .../runtime/client/JobClientActorTest.java      | 190 +++++++++++-
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java |   3 +-
 .../clients/examples/JobRetrievalITCase.java    | 138 +++++++++
 20 files changed, 1499 insertions(+), 350 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index c3c666b..292da70 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -47,6 +47,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.client.JobRetrievalException;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -429,6 +431,39 @@ public abstract class ClusterClient {
 	}
 
 	/**
+	 * Reattaches to a running from from the supplied job id
+	 * @param jobID The job id of the job to attach to
+	 * @return The JobExecutionResult for the jobID
+	 * @throws JobExecutionException if an error occurs during monitoring the job execution
+	 */
+	public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException {
+		final LeaderRetrievalService leaderRetrievalService;
+		try {
+			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+		} catch (Exception e) {
+			throw new JobRetrievalException(jobID, "Could not create the leader retrieval service", e);
+		}
+
+		ActorGateway jobManagerGateway;
+		try {
+			jobManagerGateway = getJobManagerGateway();
+		} catch (Exception e) {
+			throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway");
+		}
+
+		final JobListeningContext listeningContext = JobClient.attachToRunningJob(
+				jobID,
+				jobManagerGateway,
+				flinkConfig,
+				actorSystemLoader.get(),
+				leaderRetrievalService,
+				timeout,
+				printStatusDuringExecution);
+
+		return JobClient.awaitJobResult(listeningContext);
+	}
+
+	/**
 	 * Cancels a job identified by the job id.
 	 * @param jobId the job id
 	 * @throws Exception In case an error occurred.
@@ -446,11 +481,11 @@ public abstract class ClusterClient {
 		final Object result = Await.result(response, timeout);
 
 		if (result instanceof JobManagerMessages.CancellationSuccess) {
-			LOG.info("Job cancellation with ID " + jobId + " succeeded.");
+			logAndSysout("Job cancellation with ID " + jobId + " succeeded.");
 		} else if (result instanceof JobManagerMessages.CancellationFailure) {
 			final Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
-			LOG.info("Job cancellation with ID " + jobId + " failed.", t);
-			throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
+			logAndSysout("Job cancellation with ID " + jobId + " failed because of " + t.getMessage());
+			throw new Exception("Failed to cancel the job with id " + jobId, t);
 		} else {
 			throw new Exception("Unknown message received while cancelling: " + result.getClass().getName());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-clients/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/resources/log4j-test.properties b/flink-clients/src/test/resources/log4j-test.properties
index 85897b3..5100c1f 100644
--- a/flink-clients/src/test/resources/log4j-test.properties
+++ b/flink-clients/src/test/resources/log4j-test.properties
@@ -27,4 +27,4 @@ log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index bc5ae09..cb4ecc5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -34,7 +34,7 @@ public class JobExecutionResult extends JobSubmissionResult {
 
 	private long netRuntime;
 
-	private Map<String, Object> accumulatorResults = Collections.emptyMap();
+	private final Map<String, Object> accumulatorResults;
 
 	/**
 	 * Creates a new JobExecutionResult.
@@ -49,6 +49,8 @@ public class JobExecutionResult extends JobSubmissionResult {
 
 		if (accumulators != null) {
 			this.accumulatorResults = accumulators;
+		} else {
+			this.accumulatorResults = Collections.emptyMap();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
new file mode 100644
index 0000000..5446002
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.dispatch.Futures;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobClientMessages;
+import org.apache.flink.runtime.messages.JobClientMessages.AttachToJobAndWait;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.Callable;
+
+
+/**
+ * Actor which handles Job attachment process and provides Job updates until completion.
+ */
+public class JobAttachmentClientActor extends JobClientActor {
+
+	/** JobID to attach to when the JobClientActor retrieves a job */
+	private JobID jobID;
+	/** true if a JobRegistrationSuccess message has been received */
+	private boolean successfullyRegisteredForJob = false;
+
+	public JobAttachmentClientActor(
+			LeaderRetrievalService leaderRetrievalService,
+			FiniteDuration timeout,
+			boolean sysoutUpdates) {
+		super(leaderRetrievalService, timeout, sysoutUpdates);
+	}
+
+	@Override
+	public void connectedToJobManager() {
+		if (jobID != null && !successfullyRegisteredForJob) {
+			tryToAttachToJob();
+		}
+	}
+
+	@Override
+	protected Class getClientMessageClass() {
+		return AttachToJobAndWait.class;
+	}
+
+	@Override
+	public void handleCustomMessage(Object message) {
+		if (message instanceof AttachToJobAndWait) {
+			// sanity check that this no job registration was performed through this actor before -
+			// it is a one-shot actor after all
+			if (this.client == null) {
+				jobID = ((AttachToJobAndWait) message).jobID();
+				if (jobID == null) {
+					LOG.error("Received null JobID");
+					sender().tell(
+						decorateMessage(new Status.Failure(new Exception("JobID is null"))),
+						getSelf());
+				} else {
+					LOG.info("Received JobID {}.", jobID);
+
+					this.client = getSender();
+
+					// is only successful if we already know the job manager leader
+					if (jobManager != null) {
+						tryToAttachToJob();
+					}
+				}
+			} else {
+				// repeated submission - tell failure to sender and kill self
+				String msg = "Received repeated 'AttachToJobAndWait'";
+				LOG.error(msg);
+				getSender().tell(
+					decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender());
+
+				terminate();
+			}
+		}
+		else if (message instanceof JobManagerMessages.RegisterJobClientSuccess) {
+			// job registration was successful :o)
+			JobManagerMessages.RegisterJobClientSuccess msg = ((JobManagerMessages.RegisterJobClientSuccess) message);
+			logAndPrintMessage("Successfully registered at the JobManager for Job " + msg.jobId());
+			successfullyRegisteredForJob = true;
+		}
+		else if (message instanceof JobManagerMessages.JobNotFound) {
+			LOG.info("Couldn't register JobClient for JobID {}",
+				((JobManagerMessages.JobNotFound) message).jobID());
+			client.tell(decorateMessage(message), getSelf());
+			terminate();
+		}
+		else if (JobClientMessages.getRegistrationTimeout().equals(message)) {
+			// check if our registration for a job was successful in the meantime
+			if (!successfullyRegisteredForJob) {
+				if (isClientConnected()) {
+					client.tell(
+						decorateMessage(new Status.Failure(
+							new JobClientActorRegistrationTimeoutException("Registration for Job at the JobManager " +
+								"timed out. " +	"You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT +
+								"' in case the JobManager needs more time to confirm the job client registration."))),
+						getSelf());
+				}
+
+				// We haven't heard back from the job manager after attempting registration for a job
+				// therefore terminate
+				terminate();
+			}
+		} else {
+			LOG.error("{} received unknown message: ", getClass());
+		}
+
+	}
+
+	private void tryToAttachToJob() {
+		LOG.info("Sending message to JobManager {} to attach to job {} and wait for progress", jobID);
+
+		Futures.future(new Callable<Object>() {
+			@Override
+			public Object call() throws Exception {
+				LOG.info("Attaching to job {} at the job manager {}.", jobID, jobManager.path());
+
+				jobManager.tell(
+					decorateMessage(
+						new JobManagerMessages.RegisterJobClient(
+							jobID,
+							ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
+					getSelf());
+
+				// issue a RegistrationTimeout message to check that we submit the job within
+				// the given timeout
+				getContext().system().scheduler().scheduleOnce(
+					timeout,
+					getSelf(),
+					decorateMessage(JobClientMessages.getRegistrationTimeout()),
+					getContext().dispatcher(),
+					ActorRef.noSender());
+
+				return null;
+			}
+		}, getContext().dispatcher());
+	}
+
+	public static Props createActorProps(
+			LeaderRetrievalService leaderRetrievalService,
+			FiniteDuration timeout,
+			boolean sysoutUpdates) {
+		return Props.create(
+			JobAttachmentClientActor.class,
+			leaderRetrievalService,
+			timeout,
+			sysoutUpdates);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index c0e0d08..4e916eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.client;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
+import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.pattern.Patterns;
@@ -30,6 +31,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -44,10 +47,15 @@ import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -80,28 +88,18 @@ public class JobClient {
 	}
 
 	/**
-	 * Sends a [[JobGraph]] to the JobClient actor specified by jobClient which submits it then to
-	 * the JobManager. The method blocks until the job has finished or the JobManager is no longer
-	 * alive. In the former case, the [[SerializedJobExecutionResult]] is returned and in the latter
-	 * case a [[JobExecutionException]] is thrown.
-	 *
-	 * @param actorSystem The actor system that performs the communication.
-	 * @param leaderRetrievalService Leader retrieval service which used to find the current leading
-	 *                               JobManager
-	 * @param jobGraph    JobGraph describing the Flink job
-	 * @param timeout     Timeout for futures
-	 * @param sysoutLogUpdates prints log updates to system out if true
-	 * @return The job execution result
-	 * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
-	 *                                                               execution fails.
+	 * Submits a job to a Flink cluster (non-blocking) and returns a JobListeningContext which can be
+	 * passed to {@code awaitJobResult} to get the result of the submission.
+	 * @return JobListeningContext which may be used to retrieve the JobExecutionResult via
+	 * 			{@code awaitJobResult(JobListeningContext context)}.
 	 */
-	public static JobExecutionResult submitJobAndWait(
+	public static JobListeningContext submitJob(
 			ActorSystem actorSystem,
 			LeaderRetrievalService leaderRetrievalService,
 			JobGraph jobGraph,
 			FiniteDuration timeout,
 			boolean sysoutLogUpdates,
-			ClassLoader classLoader) throws JobExecutionException {
+			ClassLoader classLoader) {
 
 		checkNotNull(actorSystem, "The actorSystem must not be null.");
 		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
@@ -112,29 +110,187 @@ public class JobClient {
 		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
 		// update messages, watches for disconnect between client and JobManager, ...
 
-		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
+		Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
 			leaderRetrievalService,
 			timeout,
 			sysoutLogUpdates);
 
 		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
-		
-		// first block handles errors while waiting for the result
-		Object answer;
+
+		Future<Object> submissionFuture = Patterns.ask(
+				jobClientActor,
+				new JobClientMessages.SubmitJobAndWait(jobGraph),
+				new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+		return new JobListeningContext(
+				jobGraph.getJobID(),
+				submissionFuture,
+				jobClientActor,
+				timeout,
+				classLoader);
+	}
+
+
+	/**
+	 * Attaches to a running Job using the JobID.
+	 * Reconstructs the user class loader by downloading the jars from the JobManager.
+	 */
+	public static JobListeningContext attachToRunningJob(
+			JobID jobID,
+			ActorGateway jobManagerGateWay,
+			Configuration configuration,
+			ActorSystem actorSystem,
+			LeaderRetrievalService leaderRetrievalService,
+			FiniteDuration timeout,
+			boolean sysoutLogUpdates) {
+
+		checkNotNull(jobID, "The jobID must not be null.");
+		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
+		checkNotNull(configuration, "The configuration must not be null.");
+		checkNotNull(actorSystem, "The actorSystem must not be null.");
+		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
+		checkNotNull(timeout, "The timeout must not be null.");
+
+		// we create a proxy JobClientActor that deals with all communication with
+		// the JobManager. It forwards the job attachments, checks the success/failure responses, logs
+		// update messages, watches for disconnect between client and JobManager, ...
+		Props jobClientActorProps = JobAttachmentClientActor.createActorProps(
+			leaderRetrievalService,
+			timeout,
+			sysoutLogUpdates);
+
+		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
+
+		Future<Object> attachmentFuture = Patterns.ask(
+				jobClientActor,
+				new JobClientMessages.AttachToJobAndWait(jobID),
+				new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+		return new JobListeningContext(
+				jobID,
+				attachmentFuture,
+				jobClientActor,
+				timeout,
+				actorSystem,
+				configuration);
+	}
+
+	/**
+	 * Reconstructs the class loader by first requesting information about it at the JobManager
+	 * and then downloading missing jar files.
+	 * @param jobID id of job
+	 * @param jobManager gateway to the JobManager
+	 * @param config the flink configuration
+	 * @return A classloader that should behave like the original classloader
+	 * @throws JobRetrievalException if anything goes wrong
+	 */
+	public static ClassLoader retrieveClassLoader(
+		JobID jobID,
+		ActorGateway jobManager,
+		Configuration config)
+		throws JobRetrievalException {
+
+		final Object jmAnswer;
 		try {
-			Future<Object> future = Patterns.ask(jobClientActor,
-					new JobClientMessages.SubmitJobAndWait(jobGraph),
-					new Timeout(AkkaUtils.INF_TIMEOUT()));
-			
-			answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
+			jmAnswer = Await.result(
+				jobManager.ask(
+					new JobManagerMessages.RequestClassloadingProps(jobID),
+					AkkaUtils.getDefaultTimeout()),
+				AkkaUtils.getDefaultTimeout());
+		} catch (Exception e) {
+			throw new JobRetrievalException(jobID, "Couldn't retrieve class loading properties from JobManager.", e);
 		}
-		catch (TimeoutException e) {
-			throw new JobTimeoutException(jobGraph.getJobID(), "Timeout while waiting for JobManager answer. " +
-					"Job time exceeded " + AkkaUtils.INF_TIMEOUT(), e);
+
+		if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
+			JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer);
+
+			Option<String> jmHost = jobManager.actor().path().address().host();
+			String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
+			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
+			final BlobCache blobClient = new BlobCache(serverAddress, config);
+
+			final List<BlobKey> requiredJarFiles = props.requiredJarFiles();
+			final List<URL> requiredClasspaths = props.requiredClasspaths();
+
+			final URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
+
+			int pos = 0;
+			for (BlobKey blobKey : props.requiredJarFiles()) {
+				try {
+					allURLs[pos++] = blobClient.getURL(blobKey);
+				} catch (Exception e) {
+					blobClient.shutdown();
+					throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey);
+				}
+			}
+
+			for (URL url : requiredClasspaths) {
+				allURLs[pos++] = url;
+			}
+
+			return new URLClassLoader(allURLs, JobClient.class.getClassLoader());
+		} else if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
+			throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found");
+		} else {
+			throw new JobRetrievalException(jobID, "Unknown response from JobManager: " + jmAnswer);
+		}
+	}
+
+	/**
+	 * Given a JobListeningContext, awaits the result of the job execution that this context is bound to
+	 * @param listeningContext The listening context of the job execution
+	 * @return The result of the execution
+	 * @throws JobExecutionException if anything goes wrong while monitoring the job
+	 */
+	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException {
+
+		final JobID jobID = listeningContext.getJobID();
+		final ActorRef jobClientActor = listeningContext.getJobClientActor();
+		final Future<Object> jobSubmissionFuture = listeningContext.getJobResultFuture();
+		final FiniteDuration askTimeout = listeningContext.getTimeout();
+		// retrieves class loader if necessary
+		final ClassLoader classLoader = listeningContext.getClassLoader();
+
+		// wait for the future which holds the result to be ready
+		// ping the JobClientActor from time to time to check if it is still running
+		while (!jobSubmissionFuture.isCompleted()) {
+			try {
+				Await.ready(jobSubmissionFuture, askTimeout);
+			} catch (InterruptedException e) {
+				throw new JobExecutionException(
+					jobID,
+					"Interrupted while waiting for job completion.");
+			} catch (TimeoutException e) {
+				try {
+					Await.result(
+						Patterns.ask(
+							jobClientActor,
+							// Ping the Actor to see if it is alive
+							new Identify(true),
+							Timeout.durationToTimeout(askTimeout)),
+						askTimeout);
+					// we got a reply, continue waiting for the job result
+				} catch (Exception eInner) {
+					// we could have a result but the JobClientActor might have been killed and
+					// thus the health check failed
+					if (!jobSubmissionFuture.isCompleted()) {
+						throw new JobExecutionException(
+							jobID,
+							"JobClientActor seems to have died before the JobExecutionResult could be retrieved.",
+							eInner);
+					}
+				}
+			}
+		}
+
+		final Object answer;
+		try {
+			// we have already awaited the result, zero time to wait here
+			answer = Await.result(jobSubmissionFuture, Duration.Zero());
 		}
 		catch (Throwable throwable) {
-			throw new JobExecutionException(jobGraph.getJobID(),
-					"Communication with JobManager failed: " + throwable.getMessage(), throwable);
+			throw new JobExecutionException(jobID,
+				"Couldn't retrieve the JobExecutionResult from the JobManager.", throwable);
 		}
 		finally {
 			// failsafe shutdown of the client actor
@@ -149,18 +305,16 @@ public class JobClient {
 			if (result != null) {
 				try {
 					return result.toJobExecutionResult(classLoader);
+				} catch (Throwable t) {
+					throw new JobExecutionException(jobID,
+						"Job was successfully executed but JobExecutionResult could not be deserialized.");
 				}
-				catch (Throwable t) {
-					throw new JobExecutionException(jobGraph.getJobID(),
-							"Job was successfully executed but JobExecutionResult could not be deserialized.");
-				}
-			}
-			else {
-				throw new JobExecutionException(jobGraph.getJobID(),
-						"Job was successfully executed but result contained a null JobExecutionResult.");
+			} else {
+				throw new JobExecutionException(jobID,
+					"Job was successfully executed but result contained a null JobExecutionResult.");
 			}
 		}
-		if (answer instanceof JobManagerMessages.JobResultFailure) {
+		else if (answer instanceof JobManagerMessages.JobResultFailure) {
 			LOG.info("Job execution failed");
 
 			SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure) answer).cause();
@@ -168,23 +322,62 @@ public class JobClient {
 				Throwable cause = serThrowable.deserializeError(classLoader);
 				if (cause instanceof JobExecutionException) {
 					throw (JobExecutionException) cause;
+				} else {
+					throw new JobExecutionException(jobID, "Job execution failed", cause);
 				}
-				else {
-					throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed", cause);
-				}
-			}
-			else {
-				throw new JobExecutionException(jobGraph.getJobID(),
-						"Job execution failed with null as failure cause.");
+			} else {
+				throw new JobExecutionException(jobID,
+					"Job execution failed with null as failure cause.");
 			}
 		}
+		else if (answer instanceof JobManagerMessages.JobNotFound) {
+			throw new JobRetrievalException(
+				((JobManagerMessages.JobNotFound) answer).jobID(),
+				"Couldn't retrieve Job " + jobID + " because it was not running.");
+		}
 		else {
-			throw new JobExecutionException(jobGraph.getJobID(),
-					"Unknown answer from JobManager after submitting the job: " + answer);
+			throw new JobExecutionException(jobID,
+				"Unknown answer from JobManager after submitting the job: " + answer);
 		}
 	}
 
 	/**
+	 * Sends a [[JobGraph]] to the JobClient actor specified by jobClient which submits it then to
+	 * the JobManager. The method blocks until the job has finished or the JobManager is no longer
+	 * alive. In the former case, the [[SerializedJobExecutionResult]] is returned and in the latter
+	 * case a [[JobExecutionException]] is thrown.
+	 *
+	 * @param actorSystem The actor system that performs the communication.
+	 * @param leaderRetrievalService Leader retrieval service which used to find the current leading
+	 *                               JobManager
+	 * @param jobGraph    JobGraph describing the Flink job
+	 * @param timeout     Timeout for futures
+	 * @param sysoutLogUpdates prints log updates to system out if true
+	 * @param classLoader The class loader for deserializing the results
+	 * @return The job execution result
+	 * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
+	 *                                                               execution fails.
+	 */
+	public static JobExecutionResult submitJobAndWait(
+			ActorSystem actorSystem,
+			LeaderRetrievalService leaderRetrievalService,
+			JobGraph jobGraph,
+			FiniteDuration timeout,
+			boolean sysoutLogUpdates,
+			ClassLoader classLoader) throws JobExecutionException {
+
+		JobListeningContext jobListeningContext = submitJob(
+				actorSystem,
+				leaderRetrievalService,
+				jobGraph,
+				timeout,
+				sysoutLogUpdates,
+				classLoader);
+
+		return awaitJobResult(jobListeningContext);
+	}
+
+	/**
 	 * Submits a job in detached mode. The method sends the JobGraph to the
 	 * JobManager and waits for the answer whether the job could be started or not.
 	 *
@@ -227,7 +420,7 @@ public class JobClient {
 					"JobManager did not respond within " + timeout.toString(), e);
 		}
 		catch (Throwable t) {
-			throw new JobExecutionException(jobGraph.getJobID(),
+			throw new JobSubmissionException(jobGraph.getJobID(),
 					"Failed to send job to JobManager: " + t.getMessage(), t.getCause());
 		}
 
@@ -258,4 +451,5 @@ public class JobClient {
 			throw new JobExecutionException(jobGraph.getJobID(), "Unexpected response from JobManager: " + result);
 		}
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index 9379c30..1380e76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -20,18 +20,11 @@ package org.apache.flink.runtime.client;
 
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
-import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Terminated;
-import akka.dispatch.Futures;
 import akka.dispatch.OnSuccess;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -39,47 +32,39 @@ import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobClientMessages.JobManagerActorRef;
 import org.apache.flink.runtime.messages.JobClientMessages.JobManagerLeaderAddress;
-import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
 import java.util.UUID;
-import java.util.concurrent.Callable;
+
 
 /**
- * Actor which constitutes the bridge between the non-actor code and the JobManager. The JobClient
- * is used to submit jobs to the JobManager and to request the port of the BlobManager.
+ * Actor which constitutes the bridge between the non-actor code and the JobManager.
+ * This base class handles the connection to the JobManager and notifies in case of timeouts. It also
+ * receives and prints job updates until job completion.
  */
-public class JobClientActor extends FlinkUntypedActor implements LeaderRetrievalListener {
+public abstract class JobClientActor extends FlinkUntypedActor implements LeaderRetrievalListener {
 
 	private final LeaderRetrievalService leaderRetrievalService;
 
 	/** timeout for futures */
-	private final FiniteDuration timeout;
+	protected final FiniteDuration timeout;
 
 	/** true if status messages shall be printed to sysout */
 	private final boolean sysoutUpdates;
 
-	/** true if a SubmitJobSuccess message has been received */
-	private boolean jobSuccessfullySubmitted = false;
-
-	/** true if a PoisonPill was taken */
-	private boolean terminated = false;
+	/** true if a PoisonPill about to be taken */
+	private boolean toBeTerminated = false;
 
 	/** ActorRef to the current leader */
-	private ActorRef jobManager;
+	protected ActorRef jobManager;
 
 	/** leader session ID of the JobManager when this actor was created */
-	private UUID leaderSessionID;
-
-	/** Actor which submits a job to the JobManager via this actor */
-	private ActorRef submitter;
+	protected UUID leaderSessionID;
 
-	/** JobGraph which shall be submitted to the JobManager */
-	private JobGraph jobGraph;
+	/** The client which the actor is responsible for */
+	protected ActorRef client;
 
 	public JobClientActor(
 			LeaderRetrievalService leaderRetrievalService,
@@ -109,9 +94,27 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
 		}
 	}
 
+	/**
+	 * Hook to be called once a connection has been established with the JobManager.
+	 */
+	protected abstract void connectedToJobManager();
+
+	/**
+	 * Hook to handle custom client message which are not handled by the base class.
+	 * @param message The message to be handled
+	 */
+	protected abstract void handleCustomMessage(Object message);
+
+	/**
+	 * Hook to let the client know about messages that should start a timer for a timeout
+	 * @return The message class after which a timeout should be started
+	 */
+	protected abstract Class getClientMessageClass();
+
+
 	@Override
 	protected void handleMessage(Object message) {
-		
+
 		// =========== State Change Messages ===============
 
 		if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) {
@@ -149,79 +152,31 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
 			JobManagerActorRef msg = (JobManagerActorRef) message;
 			connectToJobManager(msg.jobManager());
 
-			logAndPrintMessage("Connected to JobManager at " +  msg.jobManager());
+			logAndPrintMessage("Connected to JobManager at " + msg.jobManager());
 
-			if (jobGraph != null && !jobSuccessfullySubmitted) {
-				// if we haven't yet submitted the job successfully
-				tryToSubmitJob(jobGraph);
-			}
+			connectedToJobManager();
 		}
 
 		// =========== Job Life Cycle Messages ===============
-		
-		// submit a job to the JobManager
-		else if (message instanceof SubmitJobAndWait) {
-			// only accept SubmitJobWait messages if we're not about to terminate
-			if (!terminated) {
-				// sanity check that this no job was submitted through this actor before -
-				// it is a one-shot actor after all
-				if (this.submitter == null) {
-					jobGraph = ((SubmitJobAndWait) message).jobGraph();
-					if (jobGraph == null) {
-						LOG.error("Received null JobGraph");
-						sender().tell(
-							decorateMessage(new Status.Failure(new Exception("JobGraph is null"))),
-							getSelf());
-					} else {
-						LOG.info("Received job {} ({}).", jobGraph.getName(), jobGraph.getJobID());
-
-						this.submitter = getSender();
-
-						// is only successful if we already know the job manager leader
-						tryToSubmitJob(jobGraph);
-					}
-				} else {
-					// repeated submission - tell failure to sender and kill self
-					String msg = "Received repeated 'SubmitJobAndWait'";
-					LOG.error(msg);
-					getSender().tell(
-						decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender());
-
-					terminate();
-				}
-			} else {
-				// we're about to receive a PoisonPill because terminated == true
-				String msg = getClass().getName() + " is about to be terminated. Therefore, the " +
-					"job submission cannot be executed.";
-				LOG.error(msg);
-				getSender().tell(
-					decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender());
-			}
-		}
+
 		// acknowledgement to submit job is only logged, our original
-		// submitter is only interested in the final job result
-		else if (message instanceof JobManagerMessages.JobResultSuccess ||
-				message instanceof JobManagerMessages.JobResultFailure) {
-			
+		// client is only interested in the final job result
+		else if (message instanceof JobManagerMessages.JobResultMessage) {
+
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Received {} message from JobManager", message.getClass().getSimpleName());
 			}
 
-			// forward the success to the original job submitter
-			if (hasJobBeenSubmitted()) {
-				this.submitter.tell(decorateMessage(message), getSelf());
+			// forward the success to the original client
+			if (isClientConnected()) {
+				this.client.tell(decorateMessage(message), getSelf());
 			}
 
 			terminate();
 		}
-		else if (message instanceof JobManagerMessages.JobSubmitSuccess) {
-			// job was successfully submitted :-)
-			LOG.info("Job was successfully submitted to the JobManager {}.", getSender().path());
-			jobSuccessfullySubmitted = true;
-		}
 
 		// =========== Actor / Communication Failure / Timeouts ===============
-		
+
 		else if (message instanceof Terminated) {
 			ActorRef target = ((Terminated) message).getActor();
 			if (jobManager.equals(target)) {
@@ -234,7 +189,7 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
 				// Important: The ConnectionTimeout message is filtered out in case that we are
 				// notified about a new leader by setting the new leader session ID, because
 				// ConnectionTimeout extends RequiresLeaderSessionID
-				if (hasJobBeenSubmitted()) {
+				if (isClientConnected()) {
 					getContext().system().scheduler().scheduleOnce(
 						timeout,
 						getSelf(),
@@ -245,49 +200,61 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
 			} else {
 				LOG.warn("Received 'Terminated' for unknown actor " + target);
 			}
-		} else if (JobClientMessages.getConnectionTimeout().equals(message)) {
+		}
+		else if (JobClientMessages.getConnectionTimeout().equals(message)) {
 			// check if we haven't found a job manager yet
-			if (!isConnected()) {
-				if (hasJobBeenSubmitted()) {
-					submitter.tell(
-						decorateMessage(new Status.Failure(
-							new JobClientActorConnectionTimeoutException("Lost connection to the JobManager."))),
+			if (!isJobManagerConnected()) {
+				final JobClientActorConnectionTimeoutException errorMessage =
+					new JobClientActorConnectionTimeoutException("Lost connection to the JobManager.");
+				final Object replyMessage = decorateMessage(new Status.Failure(errorMessage));
+				if (isClientConnected()) {
+					client.tell(
+						replyMessage,
 						getSelf());
 				}
 				// Connection timeout reached, let's terminate
 				terminate();
 			}
-		} else if (JobClientMessages.getSubmissionTimeout().equals(message)) {
-			// check if our job submission was successful in the meantime
-			if (!jobSuccessfullySubmitted) {
-				if (hasJobBeenSubmitted()) {
-					submitter.tell(
-						decorateMessage(new Status.Failure(
-							new JobClientActorSubmissionTimeoutException("Job submission to the JobManager timed out. " +
-								"You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + "' in case the JobManager " +
-								"needs more time to configure and confirm the job submission."))),
-						getSelf());
-				}
-
-				// We haven't heard back from the job manager after sending the job graph to him,
-				// therefore terminate
-				terminate();
-			}
 		}
 
-		// =========== Unknown Messages ===============
-		
+		// =========== Message Delegation ===============
+
+		else if (!isJobManagerConnected() && getClientMessageClass().equals(message.getClass())) {
+			LOG.info(
+				"Received {} but there is no connection to a JobManager yet.",
+				message);
+			// We want to submit/attach to a job, but we haven't found a job manager yet.
+			// Let's give him another chance to find a job manager within the given timeout.
+			getContext().system().scheduler().scheduleOnce(
+				timeout,
+				getSelf(),
+				decorateMessage(JobClientMessages.getConnectionTimeout()),
+				getContext().dispatcher(),
+				ActorRef.noSender()
+			);
+			handleCustomMessage(message);
+		}
 		else {
-			LOG.error("JobClient received unknown message: " + message);
+			if (!toBeTerminated) {
+				handleCustomMessage(message);
+			} else {
+				// we're about to receive a PoisonPill because toBeTerminated == true
+				String msg = getClass().getName() + " is about to be terminated. Therefore, the " +
+					"job submission cannot be executed.";
+				LOG.error(msg);
+				getSender().tell(
+					decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender());
+			}
 		}
 	}
 
+
 	@Override
 	protected UUID getLeaderSessionID() {
 		return leaderSessionID;
 	}
 
-	private void logAndPrintMessage(String message) {
+	protected void logAndPrintMessage(String message) {
 		LOG.info(message);
 		if (sysoutUpdates) {
 			System.out.println(message);
@@ -351,97 +318,19 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
 		getContext().watch(jobManager);
 	}
 
-	private void tryToSubmitJob(final JobGraph jobGraph) {
-		this.jobGraph = jobGraph;
-
-		if (isConnected()) {
-			LOG.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress",
-				jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID());
-
-			Futures.future(new Callable<Object>() {
-				@Override
-				public Object call() throws Exception {
-					ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
-
-					LOG.info("Upload jar files to job manager {}.", jobManager.path());
-
-					try {
-						jobGraph.uploadUserJars(jobManagerGateway, timeout);
-					} catch (IOException exception) {
-						getSelf().tell(
-							decorateMessage(new JobManagerMessages.JobResultFailure(
-								new SerializedThrowable(
-									new JobSubmissionException(
-										jobGraph.getJobID(),
-										"Could not upload the jar files to the job manager.",
-										exception)
-								)
-							)),
-							ActorRef.noSender()
-						);
-					}
-
-					LOG.info("Submit job to the job manager {}.", jobManager.path());
-
-					jobManager.tell(
-						decorateMessage(
-							new JobManagerMessages.SubmitJob(
-								jobGraph,
-								ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
-						getSelf());
-
-					// issue a SubmissionTimeout message to check that we submit the job within
-					// the given timeout
-					getContext().system().scheduler().scheduleOnce(
-						timeout,
-						getSelf(),
-						decorateMessage(JobClientMessages.getSubmissionTimeout()),
-						getContext().dispatcher(),
-						ActorRef.noSender());
-
-					return null;
-				}
-			}, getContext().dispatcher());
-		} else {
-			LOG.info("Could not submit job {} ({}), because there is no connection to a " +
-					"JobManager.",
-				jobGraph.getName(), jobGraph.getJobID());
-
-			// We want to submit a job, but we haven't found a job manager yet.
-			// Let's give him another chance to find a job manager within the given timeout.
-			getContext().system().scheduler().scheduleOnce(
-				timeout,
-				getSelf(),
-				decorateMessage(JobClientMessages.getConnectionTimeout()),
-				getContext().dispatcher(),
-				ActorRef.noSender()
-			);
-		}
-	}
-
-	private void terminate() {
+	protected void terminate() {
 		LOG.info("Terminate JobClientActor.");
-		terminated = true;
+		toBeTerminated = true;
 		disconnectFromJobManager();
 		getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
 	}
 
-	private boolean isConnected() {
+	private boolean isJobManagerConnected() {
 		return jobManager != ActorRef.noSender();
 	}
 
-	private boolean hasJobBeenSubmitted() {
-		return submitter != ActorRef.noSender();
+	protected boolean isClientConnected() {
+		return client != ActorRef.noSender();
 	}
 
-	public static Props createJobClientActorProps(
-			LeaderRetrievalService leaderRetrievalService,
-			FiniteDuration timeout,
-			boolean sysoutUpdates) {
-		return Props.create(
-			JobClientActor.class,
-			leaderRetrievalService,
-			timeout,
-			sysoutUpdates);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java
new file mode 100644
index 0000000..e57d1b4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+/**
+ * Exception which is thrown by the {@link JobClientActor} if it has not heard back from the job
+ * manager after it has attempted to register for a job within a given timeout interval.
+ */
+public class JobClientActorRegistrationTimeoutException extends Exception {
+	private static final long serialVersionUID = 8762463142030454853L;
+
+	public JobClientActorRegistrationTimeoutException(String msg) {
+		super(msg);
+	}
+
+	public JobClientActorRegistrationTimeoutException(String msg, Throwable cause) {
+		super(msg, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
new file mode 100644
index 0000000..b5d7cb7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.client;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The JobListeningContext holds the state necessary to monitor a running job and receive its results.
+ */
+public final class JobListeningContext {
+
+	private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+	/** The Job id of the Job */
+	private final JobID jobID;
+	/** The Future which is completed upon job completion */
+	private final Future<Object> jobResultFuture;
+	/** The JobClientActor which handles communication and monitoring of the job */
+	private final ActorRef jobClientActor;
+	/** Timeout used Asks */
+	private final FiniteDuration timeout;
+
+	/** ActorSystem for leader retrieval */
+	private ActorSystem actorSystem;
+	/** Flink configuration for initializing the BlobService */
+	private Configuration configuration;
+
+	/** The class loader (either provided at job submission or reconstructed when it is needed */
+	private ClassLoader classLoader;
+
+	/**
+	 * Constructor to use when the class loader is available.
+	 */
+	public JobListeningContext(
+		JobID jobID,
+		Future<Object> jobResultFuture,
+		ActorRef jobClientActor,
+		FiniteDuration timeout,
+		ClassLoader classLoader) {
+		this.jobID = checkNotNull(jobID);
+		this.jobResultFuture = checkNotNull(jobResultFuture);
+		this.jobClientActor = checkNotNull(jobClientActor);
+		this.timeout = checkNotNull(timeout);
+		this.classLoader = checkNotNull(classLoader);
+	}
+
+	/**
+	 * Constructor to use when the class loader is not available.
+	 */
+	public JobListeningContext(
+		JobID jobID,
+		Future<Object> jobResultFuture,
+		ActorRef jobClientActor,
+		FiniteDuration timeout,
+		ActorSystem actorSystem,
+		Configuration configuration) {
+		this.jobID = checkNotNull(jobID);
+		this.jobResultFuture = checkNotNull(jobResultFuture);
+		this.jobClientActor = checkNotNull(jobClientActor);
+		this.timeout = checkNotNull(timeout);
+		this.actorSystem = checkNotNull(actorSystem);
+		this.configuration = checkNotNull(configuration);
+	}
+
+	/**
+	 * @return The Job ID that this context is bound to.
+	 */
+	public JobID getJobID() {
+		return jobID;
+	}
+
+	/**
+	 * @return The Future that eventually holds the result of the execution.
+	 */
+	public Future<Object> getJobResultFuture() {
+		return jobResultFuture;
+	}
+
+	/**
+	 * @return The Job Client actor which communicats with the JobManager.
+	 */
+	public ActorRef getJobClientActor() {
+		return jobClientActor;
+	}
+
+	/**
+	 * @return The default timeout of Akka asks
+	 */
+	public FiniteDuration getTimeout() {
+		return timeout;
+	}
+
+	/**
+	 * The class loader necessary to deserialize the result of a job execution,
+	 * i.e. JobExecutionResult or Exceptions
+	 * @return The class loader for the job id
+	 * @throws JobRetrievalException if anything goes wrong
+	 */
+	public ClassLoader getClassLoader() throws JobRetrievalException {
+		if (classLoader == null) {
+			// lazily initializes the class loader when it is needed
+			classLoader = JobClient.retrieveClassLoader(jobID, getJobManager(), configuration);
+			LOG.info("Reconstructed class loader for Job {}", jobID);
+		}
+		return classLoader;
+	}
+
+	private ActorGateway getJobManager() throws JobRetrievalException {
+		try {
+			return LeaderRetrievalUtils.retrieveLeaderGateway(
+				LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
+				actorSystem,
+				AkkaUtils.getLookupTimeout(configuration));
+		} catch (Exception e) {
+			throw new JobRetrievalException(jobID, "Couldn't retrieve leading JobManager.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java
new file mode 100644
index 0000000..a92bddc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * Exception used to indicate that a job couldn't be retrieved from the JobManager
+ */
+public class JobRetrievalException extends JobExecutionException {
+
+	private static final long serialVersionUID = -42L;
+
+	public JobRetrievalException(JobID jobID, String msg, Throwable cause) {
+		super(jobID, msg, cause);
+	}
+
+	public JobRetrievalException(JobID jobID, String msg) {
+		super(jobID, msg);
+	}
+
+	public JobRetrievalException(JobID jobID, Throwable cause) {
+		super(jobID, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
new file mode 100644
index 0000000..2cc4a50
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.dispatch.Futures;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobClientMessages;
+import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.util.SerializedThrowable;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+
+/**
+ * Actor which handles Job submission process and provides Job updates until completion.
+ */
+public class JobSubmissionClientActor extends JobClientActor {
+
+	/** JobGraph which shall be submitted to the JobManager */
+	private JobGraph jobGraph;
+	/** true if a SubmitJobSuccess message has been received */
+	private boolean jobSuccessfullySubmitted = false;
+
+	public JobSubmissionClientActor(
+			LeaderRetrievalService leaderRetrievalService,
+			FiniteDuration timeout,
+			boolean sysoutUpdates) {
+		super(leaderRetrievalService, timeout, sysoutUpdates);
+	}
+
+
+	@Override
+	public void connectedToJobManager() {
+		if (jobGraph != null && !jobSuccessfullySubmitted) {
+			// if we haven't yet submitted the job successfully
+			tryToSubmitJob();
+		}
+	}
+
+	@Override
+	protected Class getClientMessageClass() {
+		return SubmitJobAndWait.class;
+	}
+
+	@Override
+	public void handleCustomMessage(Object message) {
+		// submit a job to the JobManager
+		if (message instanceof SubmitJobAndWait) {
+			// sanity check that this no job was submitted through this actor before -
+			// it is a one-shot actor after all
+			if (this.client == null) {
+				jobGraph = ((SubmitJobAndWait) message).jobGraph();
+				if (jobGraph == null) {
+					LOG.error("Received null JobGraph");
+					sender().tell(
+						decorateMessage(new Status.Failure(new Exception("JobGraph is null"))),
+						getSelf());
+				} else {
+					LOG.info("Received job {} ({}).", jobGraph.getName(), jobGraph.getJobID());
+
+					this.client = getSender();
+
+					// is only successful if we already know the job manager leader
+					if (jobManager != null) {
+						tryToSubmitJob();
+					}
+				}
+			} else {
+				// repeated submission - tell failure to sender and kill self
+				String msg = "Received repeated 'SubmitJobAndWait'";
+				LOG.error(msg);
+				getSender().tell(
+					decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender());
+
+				terminate();
+			}
+		} else if (message instanceof JobManagerMessages.JobSubmitSuccess) {
+			// job was successfully submitted :-)
+			LOG.info("Job {} was successfully submitted to the JobManager {}.",
+				((JobManagerMessages.JobSubmitSuccess) message).jobId(),
+				getSender().path());
+			jobSuccessfullySubmitted = true;
+		} else if (JobClientMessages.getSubmissionTimeout().equals(message)) {
+			// check if our job submission was successful in the meantime
+			if (!jobSuccessfullySubmitted) {
+				if (isClientConnected()) {
+					client.tell(
+						decorateMessage(new Status.Failure(
+							new JobClientActorSubmissionTimeoutException("Job submission to the JobManager timed out. " +
+								"You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + "' in case the JobManager " +
+								"needs more time to configure and confirm the job submission."))),
+						getSelf());
+				}
+
+				// We haven't heard back from the job manager after sending the job graph to him,
+				// therefore terminate
+				terminate();
+			}
+		} else {
+			LOG.error("{} received unknown message: ", getClass());
+		}
+	}
+
+	private void tryToSubmitJob() {
+		LOG.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress",
+			jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID());
+
+		Futures.future(new Callable<Object>() {
+			@Override
+			public Object call() throws Exception {
+				ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
+
+				LOG.info("Upload jar files to job manager {}.", jobManager.path());
+
+				try {
+					jobGraph.uploadUserJars(jobManagerGateway, timeout);
+				} catch (IOException exception) {
+					getSelf().tell(
+						decorateMessage(new JobManagerMessages.JobResultFailure(
+							new SerializedThrowable(
+								new JobSubmissionException(
+									jobGraph.getJobID(),
+									"Could not upload the jar files to the job manager.",
+									exception)
+							)
+						)),
+						ActorRef.noSender()
+					);
+				}
+
+				LOG.info("Submit job to the job manager {}.", jobManager.path());
+
+				jobManager.tell(
+					decorateMessage(
+						new JobManagerMessages.SubmitJob(
+							jobGraph,
+							ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
+					getSelf());
+
+				// issue a SubmissionTimeout message to check that we submit the job within
+				// the given timeout
+				getContext().system().scheduler().scheduleOnce(
+					timeout,
+					getSelf(),
+					decorateMessage(JobClientMessages.getSubmissionTimeout()),
+					getContext().dispatcher(),
+					ActorRef.noSender());
+
+				return null;
+			}
+		}, getContext().dispatcher());
+	}
+
+
+	public static Props createActorProps(
+			LeaderRetrievalService leaderRetrievalService,
+			FiniteDuration timeout,
+			boolean sysoutUpdates) {
+		return Props.create(
+			JobSubmissionClientActor.class,
+			leaderRetrievalService,
+			timeout,
+			sysoutUpdates);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 7a94c0f..d7e40a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -931,6 +931,7 @@ public class ExecutionGraph {
 		intermediateResults.clear();
 		currentExecutions.clear();
 		requiredJarFiles.clear();
+		requiredClasspaths.clear();
 		jobStatusListeners.clear();
 		executionListeners.clear();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
index 67d7a06..a84650c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.ActorRef
 import org.apache.flink.runtime.akka.ListeningBehaviour
 
+
 /**
  * Utility class to store job information on the [[JobManager]]. The JobInfo stores which actor
  * submitted the job, when the start time and, if already terminated, the end time was.
@@ -37,11 +38,14 @@ import org.apache.flink.runtime.akka.ListeningBehaviour
  * @param start Starting time
  */
 class JobInfo(
-  val client: ActorRef,
-  val listeningBehaviour: ListeningBehaviour,
+  client: ActorRef,
+  listeningBehaviour: ListeningBehaviour,
   val start: Long,
   val sessionTimeout: Long) extends Serializable {
 
+  val clients = scala.collection.mutable.HashSet[(ActorRef, ListeningBehaviour)]()
+  clients += ((client, listeningBehaviour))
+
   var sessionAlive = sessionTimeout > 0
 
   var lastActive = 0L
@@ -58,10 +62,62 @@ class JobInfo(
     }
   }
 
-  override def toString = s"JobInfo(client: $client ($listeningBehaviour), start: $start)"
+
+  /**
+    * Notifies all clients by sending a message
+    * @param message the message to send
+    */
+  def notifyClients(message: Any) = {
+    clients foreach {
+      case (clientActor, _) =>
+        clientActor ! message
+    }
+  }
+
+  /**
+    * Notifies all clients which are not of type detached
+    * @param message the message to sent to non-detached clients
+    */
+  def notifyNonDetachedClients(message: Any) = {
+    clients foreach {
+      case (clientActor, ListeningBehaviour.DETACHED) =>
+        // do nothing
+      case (clientActor, _) =>
+        clientActor ! message
+    }
+  }
+
+  /**
+    * Sends a message to job clients that match the listening behavior
+    * @param message the message to send to all clients
+    * @param listeningBehaviour the desired listening behaviour
+    */
+  def notifyClients(message: Any, listeningBehaviour: ListeningBehaviour) = {
+    clients foreach {
+      case (clientActor, `listeningBehaviour`) =>
+        clientActor ! message
+      case _ =>
+    }
+  }
 
   def setLastActive() =
     lastActive = System.currentTimeMillis()
+
+
+  override def toString = s"JobInfo(clients: ${clients.toString()}, start: $start)"
+
+  override def equals(other: Any): Boolean = other match {
+    case that: JobInfo =>
+      clients == that.clients &&
+        start == that.start &&
+        sessionTimeout == that.sessionTimeout
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    val state = Seq(clients, start, sessionTimeout)
+    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+  }
 }
 
 object JobInfo{

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0587987..d35fb0a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,18 +19,16 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.io.{File, IOException}
-import java.net.{BindException, ServerSocket, UnknownHostException, InetAddress, InetSocketAddress}
+import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, UnknownHostException}
 import java.lang.management.ManagementFactory
 import java.util.UUID
 import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
 import javax.management.ObjectName
 
-import akka.actor.Status.{Success, Failure}
+import akka.actor.Status.{Failure, Success}
 import akka.actor._
 import akka.pattern.ask
-
 import grizzled.slf4j.Logger
-
 import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
 import org.apache.flink.core.fs.FileSystem
@@ -41,8 +39,8 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint._
-import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStoreFactory, SavepointStore}
-import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, SimpleCheckpointStatsTracker, DisabledCheckpointStatsTracker}
+import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore, SavepointStoreFactory}
+import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, DisabledCheckpointStatsTracker, SimpleCheckpointStatsTracker}
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -58,24 +56,22 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService}
-
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
 import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified}
-import org.apache.flink.runtime.messages.checkpoint.{DeclineCheckpoint, AbstractCheckpointMessage, AcknowledgeCheckpoint}
-
+import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.InfoMessage
 import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
-import org.apache.flink.runtime.query.{UnknownKvStateLocation, KvStateMessage}
-import org.apache.flink.runtime.query.KvStateMessage.{NotifyKvStateUnregistered, LookupKvStateLocation, NotifyKvStateRegistered}
+import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
+import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -83,7 +79,6 @@ import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
-
 import org.jboss.netty.channel.ChannelException
 
 import scala.annotation.tailrec
@@ -479,6 +474,22 @@ class JobManager(
 
       submitJob(jobGraph, jobInfo)
 
+    case RegisterJobClient(jobID, listeningBehaviour) =>
+      val client = sender()
+      currentJobs.get(jobID) match {
+        case Some((executionGraph, jobInfo)) =>
+          log.info("Registering client for job $jobID")
+          jobInfo.clients += ((client, listeningBehaviour))
+          val listener = new StatusListenerMessenger(client, leaderSessionID.orNull)
+          executionGraph.registerJobStatusListener(listener)
+          if (listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
+            executionGraph.registerExecutionListener(listener)
+          }
+          client ! decorateMessage(RegisterJobClientSuccess(jobID))
+        case None =>
+          client ! decorateMessage(JobNotFound(jobID))
+      }
+
     case RecoverSubmittedJob(submittedJobGraph) =>
       if (!currentJobs.contains(submittedJobGraph.getJobId)) {
         submitJob(
@@ -788,50 +799,53 @@ class JobManager(
               }
 
               // is the client waiting for the job result?
-              if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
-                newJobStatus match {
-                  case JobStatus.FINISHED =>
-                  try {
-                    val accumulatorResults = executionGraph.getAccumulatorsSerialized()
-                    val result = new SerializedJobExecutionResult(
-                      jobID,
-                      jobInfo.duration,
-                      accumulatorResults)
-
-                    jobInfo.client ! decorateMessage(JobResultSuccess(result))
-                  } catch {
-                    case e: Exception =>
-                      log.error(s"Cannot fetch final accumulators for job $jobID", e)
-                      val exception = new JobExecutionException(jobID,
-                        "Failed to retrieve accumulator results.", e)
+              newJobStatus match {
+                case JobStatus.FINISHED =>
+                try {
+                  val accumulatorResults = executionGraph.getAccumulatorsSerialized()
+                  val result = new SerializedJobExecutionResult(
+                    jobID,
+                    jobInfo.duration,
+                    accumulatorResults)
+
+                  jobInfo.notifyNonDetachedClients(
+                    decorateMessage(JobResultSuccess(result)))
+                } catch {
+                  case e: Exception =>
+                    log.error(s"Cannot fetch final accumulators for job $jobID", e)
+                    val exception = new JobExecutionException(jobID,
+                      "Failed to retrieve accumulator results.", e)
 
-                      jobInfo.client ! decorateMessage(JobResultFailure(
-                        new SerializedThrowable(exception)))
-                  }
+                    jobInfo.notifyNonDetachedClients(
+                      decorateMessage(JobResultFailure(
+                        new SerializedThrowable(exception))))
+                }
 
-                  case JobStatus.CANCELED =>
-                    // the error may be packed as a serialized throwable
-                    val unpackedError = SerializedThrowable.get(
-                      error, executionGraph.getUserClassLoader())
+                case JobStatus.CANCELED =>
+                  // the error may be packed as a serialized throwable
+                  val unpackedError = SerializedThrowable.get(
+                    error, executionGraph.getUserClassLoader())
 
-                    jobInfo.client ! decorateMessage(JobResultFailure(
+                  jobInfo.notifyNonDetachedClients(
+                    decorateMessage(JobResultFailure(
                       new SerializedThrowable(
-                        new JobCancellationException(jobID, "Job was cancelled.", unpackedError))))
+                        new JobCancellationException(jobID, "Job was cancelled.", unpackedError)))))
 
-                  case JobStatus.FAILED =>
-                    val unpackedError = SerializedThrowable.get(
-                      error, executionGraph.getUserClassLoader())
+                case JobStatus.FAILED =>
+                  val unpackedError = SerializedThrowable.get(
+                    error, executionGraph.getUserClassLoader())
 
-                    jobInfo.client ! decorateMessage(JobResultFailure(
+                  jobInfo.notifyNonDetachedClients(
+                    decorateMessage(JobResultFailure(
                       new SerializedThrowable(
-                        new JobExecutionException(jobID, "Job execution failed.", unpackedError))))
-
-                  case x =>
-                    val exception = new JobExecutionException(jobID, s"$x is not a terminal state.")
-                    jobInfo.client ! decorateMessage(JobResultFailure(
-                      new SerializedThrowable(exception)))
-                    throw exception
-                }
+                        new JobExecutionException(jobID, "Job execution failed.", unpackedError)))))
+
+                case x =>
+                  val exception = new JobExecutionException(jobID, s"$x is not a terminal state.")
+                  jobInfo.notifyNonDetachedClients(
+                    decorateMessage(JobResultFailure(
+                      new SerializedThrowable(exception))))
+                  throw exception
               }
             }(context.dispatcher)
           }
@@ -919,6 +933,18 @@ class JobManager(
           archive forward decorateMessage(RequestJob(jobID))
       }
 
+    case RequestClassloadingProps(jobID) =>
+      currentJobs.get(jobID) match {
+        case Some((graph, jobInfo)) =>
+          sender() ! decorateMessage(
+            ClassloadingProps(
+              libraryCacheManager.getBlobServerPort,
+              graph.getRequiredJarFiles,
+              graph.getRequiredClasspaths))
+        case None =>
+          sender() ! decorateMessage(JobNotFound(jobID))
+      }
+
     case RequestBlobManagerPort =>
       sender ! decorateMessage(libraryCacheManager.getBlobServerPort)
 
@@ -1052,11 +1078,10 @@ class JobManager(
    */
   private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
     if (jobGraph == null) {
-      jobInfo.client ! decorateMessage(JobResultFailure(
-        new SerializedThrowable(
-          new JobSubmissionException(null, "JobGraph must not be null.")
-        )
-      ))
+      jobInfo.notifyClients(
+        decorateMessage(JobResultFailure(
+          new SerializedThrowable(
+            new JobSubmissionException(null, "JobGraph must not be null.")))))
     }
     else {
       val jobId = jobGraph.getJobID
@@ -1259,13 +1284,15 @@ class JobManager(
         executionGraph.registerJobStatusListener(
           new StatusListenerMessenger(self, leaderSessionID.orNull))
 
-        if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
+        jobInfo.clients foreach {
           // the sender wants to be notified about state changes
-          val listener  = new StatusListenerMessenger(jobInfo.client, leaderSessionID.orNull)
-
-          executionGraph.registerExecutionListener(listener)
-          executionGraph.registerJobStatusListener(listener)
+          case (client, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) =>
+            val listener  = new StatusListenerMessenger(client, leaderSessionID.orNull)
+            executionGraph.registerExecutionListener(listener)
+            executionGraph.registerJobStatusListener(listener)
+          case _ => // do nothing
         }
+
       } catch {
         case t: Throwable =>
           log.error(s"Failed to submit job $jobId ($jobName)", t)
@@ -1283,7 +1310,8 @@ class JobManager(
             new JobExecutionException(jobId, s"Failed to submit job $jobId ($jobName)", t)
           }
 
-          jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(rt)))
+          jobInfo.notifyClients(
+            decorateMessage(JobResultFailure(new SerializedThrowable(rt))))
           return
       }
 
@@ -1338,7 +1366,8 @@ class JobManager(
             }
           }
 
-          jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
+          jobInfo.notifyClients(
+            decorateMessage(JobSubmitSuccess(jobGraph.getJobID)))
 
           if (leaderElectionService.hasLeadership) {
             // There is a small chance that multiple job managers schedule the same job after if
@@ -1740,10 +1769,10 @@ class JobManager(
       future {
         eg.suspend(cause)
 
-        if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
-          jobInfo.client ! decorateMessage(
-            Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause)))
-        }
+        jobInfo.notifyNonDetachedClients(
+          decorateMessage(
+            Failure(
+              new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause))))
       }(context.dispatcher)
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
index a60fa7a..1f29e32 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.messages
 import java.util.UUID
 
 import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.jobgraph.JobGraph
 
 /**
@@ -29,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph
 object JobClientMessages {
 
   /**
-   * This message is sent to the JobClient (via ask) to submit a job and
+   * This message is sent to the JobClientActor (via ask) to submit a job and
    * get a response when the job execution has finished.
    * 
    * The response to this message is a
@@ -40,15 +41,11 @@ object JobClientMessages {
   case class SubmitJobAndWait(jobGraph: JobGraph)
 
   /**
-   * This message is sent to the JobClient (via ask) to submit a job and 
-   * return as soon as the result of the submit operation is known. 
-   *
-   * The response to this message is a
-   * [[org.apache.flink.api.common.JobSubmissionResult]]
-   *
-   * @param jobGraph The job to be executed.
-   */
-  case class SubmitJobDetached(jobGraph: JobGraph)
+    * This message is sent to the JobClientActor to ask it to register at the JobManager
+    * and then return once the job execution is complete.
+    * @param jobID The job id
+    */
+  case class AttachToJobAndWait(jobID: JobID)
 
   /** Notifies the JobClientActor about a new leader address and a leader session ID.
     *
@@ -66,9 +63,13 @@ object JobClientMessages {
   /** Message which is triggered when the submission timeout has been reached. */
   case object SubmissionTimeout extends RequiresLeaderSessionID
 
-  /** Messaeg which is triggered when the connection timeout has been reached. */
+  /** Message which is triggered when the JobClient registration at the JobManager times out */
+  case object RegistrationTimeout extends RequiresLeaderSessionID
+
+  /** Message which is triggered when the connection timeout has been reached. */
   case object ConnectionTimeout extends RequiresLeaderSessionID
 
   def getSubmissionTimeout(): AnyRef = SubmissionTimeout
+  def getRegistrationTimeout(): AnyRef = RegistrationTimeout
   def getConnectionTimeout(): AnyRef = ConnectionTimeout
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 14f72b0..40c4dcf 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.messages
 
+import java.net.URL
 import java.util.UUID
 
 import akka.actor.ActorRef
@@ -69,6 +70,19 @@ object JobManagerMessages {
     extends RequiresLeaderSessionID
 
   /**
+    * Registers the sender of the message as the client for the provided job identifier.
+    * This message is acknowledged by the JobManager with [[RegisterJobClientSuccess]]
+    * or [[JobNotFound]] if the job was not running.
+    * @param jobID The job id of the job
+    * @param listeningBehaviour The types of updates which will be sent to the sender
+    * after registration
+    */
+  case class RegisterJobClient(
+      jobID: JobID,
+      listeningBehaviour: ListeningBehaviour)
+    extends RequiresLeaderSessionID
+
+  /**
    * Triggers the recovery of the job with the given ID.
    *
    * @param jobId ID of the job to recover
@@ -195,6 +209,23 @@ object JobManagerMessages {
   case object RequestTotalNumberOfSlots
 
   /**
+    * Requests all entities necessary for reconstructing a job class loader
+    * May respond with [[ClassloadingProps]] or [[JobNotFound]]
+    * @param jobId The job id of the registered job
+    */
+  case class RequestClassloadingProps(jobId: JobID)
+
+  /**
+    * Response to [[RequestClassloadingProps]]
+    * @param blobManagerPort The port of the blobManager
+    * @param requiredJarFiles The blob keys of the required jar files
+    * @param requiredClasspaths The urls of the required classpaths
+    */
+  case class ClassloadingProps(blobManagerPort: Integer,
+                               requiredJarFiles: java.util.List[BlobKey],
+                               requiredClasspaths: java.util.List[URL])
+
+  /**
    * Requests the port of the blob manager from the job manager. The result is sent back to the
    * sender as an [[Int]].
    */
@@ -218,16 +249,27 @@ object JobManagerMessages {
   case class JobSubmitSuccess(jobId: JobID)
 
   /**
+    * Denotes a successful registration of a JobClientActor for a running job
+    * @param jobId The job id of the registered job
+    */
+  case class RegisterJobClientSuccess(jobId: JobID)
+
+  /**
+    * Denotes messages which contain the result of a completed job execution
+    */
+  sealed trait JobResultMessage
+
+  /**
    * Denotes a successful job execution.
    * @param result The result of the job execution, in serialized form.
    */
-  case class JobResultSuccess(result: SerializedJobExecutionResult)
+  case class JobResultSuccess(result: SerializedJobExecutionResult) extends JobResultMessage
 
   /**
    * Denotes an unsuccessful job execution.
    * @param cause The exception that caused the job to fail, in serialized form.
    */
-  case class JobResultFailure(cause: SerializedThrowable)
+  case class JobResultFailure(cause: SerializedThrowable) extends JobResultMessage
 
 
   sealed trait CancellationResponse{
@@ -316,7 +358,7 @@ object JobManagerMessages {
 
   /**
    * Denotes that there is no job with [[jobID]] retrievable. This message can be the response of
-   * [[RequestJob]] or [[RequestJobStatus]].
+   * [[RequestJob]], [[RequestJobStatus]] or [[RegisterJobClient]].
    *
    * @param jobID
    */