You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/10 09:14:08 UTC

[1/2] flink git commit: [FLINK-7372] [JobGraph] Remove ActorGateway from JobGraph

Repository: flink
Updated Branches:
  refs/heads/master e0d8c147e -> dfaec3370


[FLINK-7372] [JobGraph] Remove ActorGateway from JobGraph

The JobGraph has an unncessary dependency on the ActorGateway via its
JobGraph#uploadUserJars method. In order to get rid of this dependency
for future Flip-6 changes, this commit retrieves the BlobServer's
address beforehand and directly passes it to this method.

This closes #4483.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d52ccd29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d52ccd29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d52ccd29

Branch: refs/heads/master
Commit: d52ccd2941ff25c3c61146b25c52df1ddc09d8da
Parents: e0d8c14
Author: Till Rohrmann <tr...@apache.org>
Authored: Sat Aug 5 00:28:15 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Aug 10 10:56:48 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/client/program/ClientTest.java |  5 +-
 .../webmonitor/handlers/JarRunHandler.java      | 15 ++++-
 .../apache/flink/runtime/blob/BlobClient.java   | 56 ++---------------
 .../apache/flink/runtime/client/JobClient.java  | 39 +++++++++++-
 .../client/JobSubmissionClientActor.java        | 30 +++++++++-
 .../apache/flink/runtime/jobgraph/JobGraph.java | 63 +++-----------------
 .../runtime/client/JobClientActorTest.java      |  7 ++-
 7 files changed, 101 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 97794dd..ba2fc94 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -322,8 +322,9 @@ public class ClientTest extends TestLogger {
 				getSender().tell(
 						decorateMessage(new JobManagerMessages.ResponseLeaderSessionID(leaderSessionID)),
 						getSelf());
-			}
-			else {
+			} else if (message instanceof JobManagerMessages.RequestBlobManagerPort$) {
+				getSender().tell(1337, getSelf());
+			} else {
 				getSender().tell(
 						decorateMessage(new Status.Failure(new Exception("Unknown message " + message))),
 						getSelf());

http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 9a7cabe..303b180 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -31,7 +31,10 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import java.io.File;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.net.InetSocketAddress;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.FiniteDuration;
 
@@ -61,8 +64,18 @@ public class JarRunHandler extends JarActionHandler {
 		try {
 			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
 			Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
+
+			final CompletableFuture<InetSocketAddress> blobServerAddressFuture = JobClient.retrieveBlobServerAddress(jobManager, timeout);
+			final InetSocketAddress blobServerAddress;
+
+			try {
+				blobServerAddress = blobServerAddressFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+			} catch (Exception e) {
+				throw new ProgramInvocationException("Failed to retrieve BlobServer address.", e);
+			}
+
 			try {
-				graph.f0.uploadUserJars(jobManager, timeout, clientConfig);
+				graph.f0.uploadUserJars(blobServerAddress, clientConfig);
 			} catch (IOException e) {
 				throw new ProgramInvocationException("Failed to upload jar files to the job manager", e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index ce59d75..0882ec3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -24,16 +24,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
@@ -484,50 +478,6 @@ public final class BlobClient implements Closeable {
 	}
 
 	/**
-	 * Retrieves the {@link BlobServer} address from the JobManager and uploads
-	 * the JAR files to it.
-	 *
-	 * @param jobManager   Server address of the {@link BlobServer}
-	 * @param askTimeout   Ask timeout for blob server address retrieval
-	 * @param clientConfig Any additional configuration for the blob client
-	 * @param jars         List of JAR files to upload
-	 * @throws IOException Thrown if the address retrieval or upload fails
-	 */
-	public static List<BlobKey> uploadJarFiles(
-			ActorGateway jobManager,
-			FiniteDuration askTimeout,
-			Configuration clientConfig,
-			List<Path> jars) throws IOException {
-
-		if (jars.isEmpty()) {
-			return Collections.emptyList();
-		} else {
-			Object msg = JobManagerMessages.getRequestBlobManagerPort();
-			Future<Object> futureBlobPort = jobManager.ask(msg, askTimeout);
-
-			try {
-				// Retrieve address
-				Object result = Await.result(futureBlobPort, askTimeout);
-				if (result instanceof Integer) {
-					int port = (Integer) result;
-					LOG.info("Blob client connecting to " + jobManager.path());
-
-					Option<String> jmHost = jobManager.actor().path().address().host();
-					String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
-					InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, port);
-
-					// Now, upload
-					return uploadJarFiles(serverAddress, clientConfig, jars);
-				} else {
-					throw new Exception("Expected port number (int) as answer, received " + result);
-				}
-			} catch (Exception e) {
-				throw new IOException("Could not retrieve the JobManager's blob port.", e);
-			}
-		}
-	}
-
-	/**
 	 * Uploads the JAR files to a {@link BlobServer} at the given address.
 	 *
 	 * @param serverAddress Server address of the {@link BlobServer}
@@ -535,8 +485,10 @@ public final class BlobClient implements Closeable {
 	 * @param jars List of JAR files to upload
 	 * @throws IOException Thrown if the upload fails
 	 */
-	public static List<BlobKey> uploadJarFiles(InetSocketAddress serverAddress,
-			Configuration clientConfig, List<Path> jars) throws IOException {
+	public static List<BlobKey> uploadJarFiles(
+			InetSocketAddress serverAddress,
+			Configuration clientConfig,
+			List<Path> jars) throws IOException {
 		if (jars.isEmpty()) {
 			return Collections.emptyList();
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 6a49564..01d09a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -50,12 +51,15 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -417,8 +421,19 @@ public class JobClient {
 		checkNotNull(timeout, "The timeout must not be null.");
 
 		LOG.info("Checking and uploading JAR files");
+
+		final CompletableFuture<InetSocketAddress> blobServerAddressFuture = retrieveBlobServerAddress(jobManagerGateway, timeout);
+
+		final InetSocketAddress blobServerAddress;
+
+		try {
+			blobServerAddress = blobServerAddressFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+		} catch (Exception e) {
+			throw new JobSubmissionException(jobGraph.getJobID(), "Could not retrieve BlobServer address.", e);
+		}
+
 		try {
-			jobGraph.uploadUserJars(jobManagerGateway, timeout, config);
+			jobGraph.uploadUserJars(blobServerAddress, config);
 		}
 		catch (IOException e) {
 			throw new JobSubmissionException(jobGraph.getJobID(),
@@ -473,4 +488,26 @@ public class JobClient {
 		}
 	}
 
+	/**
+	 * Utility method to retrieve the BlobServer address from the given JobManager gateway.
+	 *
+	 * @param jobManagerGateway to obtain the BlobServer address from
+	 * @param timeout for this operation
+	 * @return CompletableFuture containing the BlobServer address
+	 */
+	public static CompletableFuture<InetSocketAddress> retrieveBlobServerAddress(
+			ActorGateway jobManagerGateway,
+			FiniteDuration timeout) {
+
+		CompletableFuture<Integer> futureBlobPort = FutureUtils.toJava(
+			jobManagerGateway
+				.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout)
+				.mapTo(ClassTag$.MODULE$.apply(Integer.class)));
+
+		final Option<String> jmHost = jobManagerGateway.actor().path().address().host();
+		final String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
+
+		return futureBlobPort.thenApply(
+			(Integer blobPort) -> new InetSocketAddress(jmHostname, blobPort));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index babb0f6..7d9f452 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -36,7 +36,10 @@ import org.apache.flink.runtime.util.SerializedThrowable;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 
 /**
@@ -144,8 +147,28 @@ public class JobSubmissionClientActor extends JobClientActor {
 
 				LOG.info("Upload jar files to job manager {}.", jobManager.path());
 
+				final CompletableFuture<InetSocketAddress> blobServerAddressFuture = JobClient.retrieveBlobServerAddress(jobManagerGateway, timeout);
+				final InetSocketAddress blobServerAddress;
+
 				try {
-					jobGraph.uploadUserJars(jobManagerGateway, timeout, clientConfig);
+					blobServerAddress = blobServerAddressFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+				} catch (Exception e) {
+					getSelf().tell(
+						decorateMessage(new JobManagerMessages.JobResultFailure(
+							new SerializedThrowable(
+								new JobSubmissionException(
+									jobGraph.getJobID(),
+									"Could not retrieve BlobServer address.",
+									e)
+							)
+						)),
+						ActorRef.noSender());
+
+					return null;
+				}
+
+				try {
+					jobGraph.uploadUserJars(blobServerAddress, clientConfig);
 				} catch (IOException exception) {
 					getSelf().tell(
 						decorateMessage(new JobManagerMessages.JobResultFailure(
@@ -156,8 +179,9 @@ public class JobSubmissionClientActor extends JobClientActor {
 									exception)
 							)
 						)),
-						ActorRef.noSender()
-					);
+						ActorRef.noSender());
+
+					return null;
 				}
 
 				LOG.info("Submit job to the job manager {}.", jobManager.path());

http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 2f5cd25..f0327a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -22,15 +22,11 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.util.SerializedValue;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -527,65 +523,24 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
-	 * Uploads the previously added user jar file to the job manager through the job manager's BLOB server.
-	 *
-	 * @param serverAddress
-	 *        the network address of the BLOB server
-	 * @param blobClientConfig
-	 *        the blob client configuration
-	 * @throws IOException
-	 *         thrown if an I/O error occurs during the upload
-	 */
-	public void uploadRequiredJarFiles(InetSocketAddress serverAddress,
-			Configuration blobClientConfig) throws IOException {
-		if (this.userJars.isEmpty()) {
-			return;
-		}
-
-		BlobClient bc = null;
-		try {
-			bc = new BlobClient(serverAddress, blobClientConfig);
-
-			for (final Path jar : this.userJars) {
-
-				final FileSystem fs = jar.getFileSystem();
-				FSDataInputStream is = null;
-				try {
-					is = fs.open(jar);
-					final BlobKey key = bc.put(is);
-					this.userJarBlobKeys.add(key);
-				}
-				finally {
-					if (is != null) {
-						is.close();
-					}
-				}
-			}
-		}
-		finally {
-			if (bc != null) {
-				bc.close();
-			}
-		}
-	}
-
-	/**
 	 * Uploads the previously added user JAR files to the job manager through
 	 * the job manager's BLOB server. The respective port is retrieved from the
 	 * JobManager. This function issues a blocking call.
 	 *
-	 * @param jobManager JobManager actor gateway
-	 * @param askTimeout Ask timeout
+	 * @param blobServerAddress of the blob server to upload the jars to
 	 * @param blobClientConfig the blob client configuration
 	 * @throws IOException Thrown, if the file upload to the JobManager failed.
 	 */
-	public void uploadUserJars(ActorGateway jobManager, FiniteDuration askTimeout,
+	public void uploadUserJars(
+			InetSocketAddress blobServerAddress,
 			Configuration blobClientConfig) throws IOException {
-		List<BlobKey> blobKeys = BlobClient.uploadJarFiles(jobManager, askTimeout, blobClientConfig, userJars);
+		if (!userJars.isEmpty()) {
+			List<BlobKey> blobKeys = BlobClient.uploadJarFiles(blobServerAddress, blobClientConfig, userJars);
 
-		for (BlobKey blobKey : blobKeys) {
-			if (!userJarBlobKeys.contains(blobKey)) {
-				userJarBlobKeys.add(blobKey);
+			for (BlobKey blobKey : blobKeys) {
+				if (!userJarBlobKeys.contains(blobKey)) {
+					userJarBlobKeys.add(blobKey);
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
index 8530b0f..919a784 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
@@ -347,6 +347,9 @@ public class JobClientActorTest extends TestLogger {
 
 		@Override
 		protected void handleMessage(Object message) throws Exception {
+			if (message instanceof RequestBlobManagerPort$) {
+				getSender().tell(1337, getSelf());
+			}
 		}
 
 		@Override
@@ -388,7 +391,9 @@ public class JobClientActorTest extends TestLogger {
 					testFuture.tell(Acknowledge.get(), getSelf());
 				}
 			}
-			else if (message instanceof RegisterTest) {
+			else if (message instanceof RequestBlobManagerPort$) {
+				getSender().tell(1337, getSelf());
+			} else if (message instanceof RegisterTest) {
 				testFuture = getSender();
 
 				if (jobAccepted) {


[2/2] flink git commit: [FLINK-7375] Replace ActorGateway with JobManagerGateway in JobClient

Posted by tr...@apache.org.
[FLINK-7375] Replace ActorGateway with JobManagerGateway in JobClient

In order to make the JobClient code independent of Akka, this PR replaces the
ActorGateway parameters by JobManagerGateway. AkkaJobManagerGateway is the
respective implementation of the JobManagerGateway for Akka. Moreover, this
PR introduces useful ExceptionUtils method for handling of Future exceptions.
Additionally, the SerializedThrowable has been moved to flink-core.

This closes #4486.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dfaec337
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dfaec337
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dfaec337

Branch: refs/heads/master
Commit: dfaec337059cc59800d6c708d03a8194db487872
Parents: d52ccd2
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Aug 6 17:56:41 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Aug 10 10:59:10 2017 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     |  25 +--
 .../apache/flink/client/program/ClientTest.java |   2 +-
 .../org/apache/flink/util/ExceptionUtils.java   |  38 ++++
 .../apache/flink/util/SerializedThrowable.java  | 181 ++++++++++++++++++
 .../webmonitor/handlers/JarRunHandler.java      |  28 +--
 .../BackPressureStatsTrackerITCase.java         |   5 +-
 .../StackTraceSampleCoordinatorITCase.java      |   5 +-
 .../runtime/akka/AkkaJobManagerGateway.java     | 122 ++++++++++++
 .../apache/flink/runtime/client/JobClient.java  | 128 +++++--------
 .../flink/runtime/client/JobClientActor.java    |   2 +-
 .../runtime/client/JobListeningContext.java     |   7 +-
 .../client/JobSubmissionClientActor.java        |  12 +-
 .../messages/FatalErrorOccurred.java            |   2 +-
 .../flink/runtime/concurrent/FutureUtils.java   |  13 ++
 .../runtime/executiongraph/ExecutionGraph.java  |   2 +-
 .../executiongraph/StatusListenerMessenger.java |   2 +-
 .../partition/ProducerFailedException.java      |   2 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |   4 +-
 .../runtime/jobmaster/JobManagerGateway.java    |  66 +++++++
 .../flink/runtime/jobmaster/JobMaster.java      |   2 +-
 .../messages/checkpoint/DeclineCheckpoint.java  |   2 +-
 .../runtime/taskmanager/TaskExecutionState.java |   2 +-
 .../flink/runtime/util/SerializedThrowable.java | 184 -------------------
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../runtime/messages/JobManagerMessages.scala   |   2 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   8 +-
 .../partition/ProducerFailedExceptionTest.java  |   2 +-
 .../runtime/util/SerializedThrowableTest.java   |   1 +
 28 files changed, 524 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 3018a8c..7bc2655 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -35,11 +35,11 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobListeningContext;
-import org.apache.flink.runtime.client.JobRetrievalException;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -500,7 +500,12 @@ public abstract class ClusterClient {
 
 		try {
 			logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission.");
-			JobClient.submitJobDetached(jobManagerGateway, flinkConfig, jobGraph, timeout, classLoader);
+			JobClient.submitJobDetached(
+				new AkkaJobManagerGateway(jobManagerGateway),
+				flinkConfig,
+				jobGraph,
+				Time.milliseconds(timeout.toMillis()),
+				classLoader);
 			return new JobSubmissionResult(jobGraph.getJobID());
 		} catch (JobExecutionException e) {
 			throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
@@ -525,16 +530,8 @@ public abstract class ClusterClient {
 				fe);
 		}
 
-		ActorGateway jobManagerGateway;
-		try {
-			jobManagerGateway = getJobManagerGateway();
-		} catch (Exception e) {
-			throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway");
-		}
-
 		final JobListeningContext listeningContext = JobClient.attachToRunningJob(
 			jobID,
-			jobManagerGateway,
 			flinkConfig,
 			actorSystem,
 			highAvailabilityServices,
@@ -563,16 +560,8 @@ public abstract class ClusterClient {
 				fe);
 		}
 
-		ActorGateway jobManagerGateway;
-		try {
-			jobManagerGateway = getJobManagerGateway();
-		} catch (Exception e) {
-			throw new JobRetrievalException(jobID, "Could not retrieve the JobManager Gateway", e);
-		}
-
 		return JobClient.attachToRunningJob(
 			jobID,
-			jobManagerGateway,
 			flinkConfig,
 			actorSystem,
 			highAvailabilityServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index ba2fc94..99f51ad 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -43,8 +43,8 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index ca81465..9c8907b 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -298,6 +299,43 @@ public final class ExceptionUtils {
 		return false;
 	}
 
+	/**
+	 * Unpacks an {@link ExecutionException} and returns its cause. Otherwise the given
+	 * Throwable is returned.
+	 *
+	 * @param throwable to unpack if it is an ExecutionException
+	 * @return Cause of ExecutionException or given Throwable
+	 */
+	public static Throwable stripExecutionException(Throwable throwable) {
+		while (throwable instanceof ExecutionException && throwable.getCause() != null) {
+			throwable = throwable.getCause();
+		}
+
+		return throwable;
+	}
+
+	/**
+	 * Tries to find a {@link SerializedThrowable} as the cause of the given throwable and throws its
+	 * deserialized value. If there is no such throwable, then the original throwable is thrown.
+	 *
+	 * @param throwable to check for a SerializedThrowable
+	 * @param classLoader to be used for the deserialization of the SerializedThrowable
+	 * @throws Throwable either the deserialized throwable or the given throwable
+	 */
+	public static void tryDeserializeAndThrow(Throwable throwable, ClassLoader classLoader) throws Throwable {
+		Throwable current = throwable;
+
+		while (!(current instanceof SerializedThrowable) && current.getCause() != null) {
+			current = current.getCause();
+		}
+
+		if (current instanceof SerializedThrowable) {
+			throw ((SerializedThrowable) current).deserializeError(classLoader);
+		} else {
+			throw throwable;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	/** Private constructor to prevent instantiation. */

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
new file mode 100644
index 0000000..dab7cda
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.lang.ref.WeakReference;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Utility class for dealing with user-defined Throwable types that are serialized (for
+ * example during RPC/Actor communication), but cannot be resolved with the default
+ * class loader.
+ * 
+ * <p>This exception mimics the original exception with respect to message and stack trace,
+ * and contains the original exception in serialized form. The original exception
+ * can be re-obtained by supplying the appropriate class loader.
+ */
+public class SerializedThrowable extends Exception implements Serializable {
+	
+	private static final long serialVersionUID = 7284183123441947635L;
+	
+	/** The original exception in serialized form */
+	private final byte[] serializedException;
+	
+	/** Name of the original error class */
+	private final String originalErrorClassName;
+	
+	/** The original stack trace, to be printed */
+	private final String fullStringifiedStackTrace;
+
+	/** The original exception, not transported via serialization, 
+	 * because the class may not be part of the system class loader.
+	 * In addition, we make sure our cached references to not prevent
+	 * unloading the exception class. */
+	private transient WeakReference<Throwable> cachedException;
+
+
+	/**
+	 * Create a new SerializedThrowable.
+	 * 
+	 * @param exception The exception to serialize.
+	 */
+	public SerializedThrowable(Throwable exception) {
+		this(exception, new HashSet<Throwable>());
+	}
+
+	private SerializedThrowable(Throwable exception, Set<Throwable> alreadySeen) {
+		super(getMessageOrError(exception));
+
+		if (!(exception instanceof SerializedThrowable)) {
+			// serialize and memoize the original message
+			byte[] serialized;
+			try {
+				serialized = InstantiationUtil.serializeObject(exception);
+			}
+			catch (Throwable t) {
+				serialized = null;
+			}
+			this.serializedException = serialized;
+			this.cachedException = new WeakReference<Throwable>(exception);
+
+			// record the original exception's properties (name, stack prints)
+			this.originalErrorClassName = exception.getClass().getName();
+			this.fullStringifiedStackTrace = ExceptionUtils.stringifyException(exception);
+
+			// mimic the original exception's stack trace
+			setStackTrace(exception.getStackTrace());
+
+			// mimic the original exception's cause
+			if (exception.getCause() == null) {
+				initCause(null);
+			}
+			else {
+				// exception causes may by cyclic, so we truncate the cycle when we find it 
+				if (alreadySeen.add(exception)) {
+					// we are not in a cycle, yet
+					initCause(new SerializedThrowable(exception.getCause(), alreadySeen));
+				}
+			}
+
+		}
+		else {
+			// copy from that serialized throwable
+			SerializedThrowable other = (SerializedThrowable) exception;
+			this.serializedException = other.serializedException;
+			this.originalErrorClassName = other.originalErrorClassName;
+			this.fullStringifiedStackTrace = other.fullStringifiedStackTrace;
+			this.cachedException = other.cachedException;
+			this.setStackTrace(other.getStackTrace());
+			this.initCause(other.getCause());
+		}
+	}
+
+	public Throwable deserializeError(ClassLoader classloader) {
+		if (serializedException == null) {
+			// failed to serialize the original exception
+			// return this SerializedThrowable as a stand in
+			return this;
+		}
+
+		Throwable cached = cachedException == null ? null : cachedException.get();
+		if (cached == null) {
+			try {
+				cached = InstantiationUtil.deserializeObject(serializedException, classloader);
+				cachedException = new WeakReference<Throwable>(cached);
+			}
+			catch (Throwable t) {
+				// something went wrong
+				// return this SerializedThrowable as a stand in
+				return this;
+			}
+		}
+		return cached;
+	}
+
+	public String getOriginalErrorClassName() {
+		return originalErrorClassName;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Override the behavior of Throwable
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void printStackTrace(PrintStream s) {
+		s.print(fullStringifiedStackTrace);
+		s.flush();
+	}
+	
+	@Override
+	public void printStackTrace(PrintWriter s) {
+		s.print(fullStringifiedStackTrace);
+		s.flush();
+	}
+	
+	@Override
+	public String toString() {
+		String message = getLocalizedMessage();
+		return (message != null) ? (originalErrorClassName + ": " + message) : originalErrorClassName;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Static utilities
+	// ------------------------------------------------------------------------
+
+	public static Throwable get(Throwable serThrowable, ClassLoader loader) {
+		if (serThrowable instanceof SerializedThrowable) {
+			return ((SerializedThrowable)serThrowable).deserializeError(loader);
+		} else {
+			return serThrowable;
+		}
+	}
+
+	private static String getMessageOrError(Throwable error) {
+		try {
+			return error.getMessage();
+		}
+		catch (Throwable t) {
+			return "(failed to get message)";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 303b180..282fea8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -29,12 +31,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.File;
-import java.io.IOException;
 import java.io.StringWriter;
-import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.FiniteDuration;
 
@@ -65,23 +63,13 @@ public class JarRunHandler extends JarActionHandler {
 			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
 			Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
 
-			final CompletableFuture<InetSocketAddress> blobServerAddressFuture = JobClient.retrieveBlobServerAddress(jobManager, timeout);
-			final InetSocketAddress blobServerAddress;
-
-			try {
-				blobServerAddress = blobServerAddressFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-			} catch (Exception e) {
-				throw new ProgramInvocationException("Failed to retrieve BlobServer address.", e);
-			}
-
-			try {
-				graph.f0.uploadUserJars(blobServerAddress, clientConfig);
-			} catch (IOException e) {
-				throw new ProgramInvocationException("Failed to upload jar files to the job manager", e);
-			}
-
 			try {
-				JobClient.submitJobDetached(jobManager, clientConfig, graph.f0, timeout, graph.f1);
+				JobClient.submitJobDetached(
+					new AkkaJobManagerGateway(jobManager),
+					clientConfig,
+					graph.f0,
+					Time.milliseconds(timeout.toMillis()),
+					graph.f1);
 			} catch (JobExecutionException e) {
 				throw new ProgramInvocationException("Failed to submit the job to the job manager", e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 4d80145..0e4734d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -156,10 +157,10 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 
 							// Submit the job and wait until it is running
 							JobClient.submitJobDetached(
-									jm,
+									new AkkaJobManagerGateway(jm),
 									config,
 									jobGraph,
-									deadline,
+									Time.milliseconds(deadline.toMillis()),
 									ClassLoader.getSystemClassLoader());
 
 							jm.tell(new WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 47b43a5..bd12668 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -131,10 +132,10 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 							for (int i = 0; i < maxAttempts; i++, sleepTime *= 2) {
 								// Submit the job and wait until it is running
 								JobClient.submitJobDetached(
-										jm,
+										new AkkaJobManagerGateway(jm),
 										config,
 										jobGraph,
-										deadline,
+										Time.milliseconds(deadline.toMillis()),
 										ClassLoader.getSystemClassLoader());
 
 								jm.tell(new WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
new file mode 100644
index 0000000..6ee78dd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import scala.Option;
+import scala.reflect.ClassTag$;
+
+/**
+ * Implementation of the {@link JobManagerGateway} for the {@link ActorGateway}.
+ */
+public class AkkaJobManagerGateway implements JobManagerGateway {
+
+	private final ActorGateway jobManagerGateway;
+	private final String hostname;
+
+	public AkkaJobManagerGateway(ActorGateway jobManagerGateway) {
+		this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway);
+
+		final Option<String> optHostname = jobManagerGateway.actor().path().address().host();
+
+		hostname = optHostname.isDefined() ? optHostname.get() : "localhost";
+	}
+
+	@Override
+	public String getAddress() {
+		return jobManagerGateway.path();
+	}
+
+	@Override
+	public String getHostname() {
+		return hostname;
+	}
+
+	@Override
+	public CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout) {
+		return FutureUtils
+			.toJava(jobManagerGateway
+				.ask(
+					new JobManagerMessages.RequestClassloadingProps(jobId),
+					FutureUtils.toFiniteDuration(timeout)))
+			.thenApply(
+				(Object response) -> {
+					if (response instanceof JobManagerMessages.ClassloadingProps) {
+						return Optional.of(((JobManagerMessages.ClassloadingProps) response));
+					} else if (response instanceof JobManagerMessages.JobNotFound) {
+						return Optional.empty();
+					} else {
+						throw new FlinkFutureException("Unknown response: " + response + '.');
+					}
+				});
+	}
+
+	@Override
+	public CompletableFuture<Integer> requestBlobServerPort(Time timeout) {
+		return FutureUtils.toJava(
+			jobManagerGateway
+				.ask(JobManagerMessages.getRequestBlobManagerPort(), FutureUtils.toFiniteDuration(timeout))
+				.mapTo(ClassTag$.MODULE$.apply(Integer.class)));
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout) {
+		return FutureUtils
+			.toJava(
+				jobManagerGateway.ask(
+					new JobManagerMessages.SubmitJob(
+						jobGraph,
+						listeningBehaviour),
+					FutureUtils.toFiniteDuration(timeout)))
+			.thenApply(
+				(Object response) -> {
+					if (response instanceof JobManagerMessages.JobSubmitSuccess) {
+						JobManagerMessages.JobSubmitSuccess success = ((JobManagerMessages.JobSubmitSuccess) response);
+
+						if (Objects.equals(success.jobId(), jobGraph.getJobID())) {
+							return Acknowledge.get();
+						} else {
+							throw new FlinkFutureException("JobManager responded for wrong Job. This Job: " +
+								jobGraph.getJobID() + ", response: " + success.jobId());
+						}
+					} else if (response instanceof JobManagerMessages.JobResultFailure) {
+						JobManagerMessages.JobResultFailure failure = ((JobManagerMessages.JobResultFailure) response);
+
+						throw new FlinkFutureException("Job submission failed.", failure.cause());
+					} else {
+						throw new FlinkFutureException("Unknown response to SubmitJob message: " + response + '.');
+					}
+				}
+			);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 01d09a1..562e697 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -28,19 +28,21 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,13 +53,13 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.Collection;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -144,7 +146,6 @@ public class JobClient {
 	 */
 	public static JobListeningContext attachToRunningJob(
 			JobID jobID,
-			ActorGateway jobManagerGateWay,
 			Configuration configuration,
 			ActorSystem actorSystem,
 			HighAvailabilityServices highAvailabilityServices,
@@ -152,7 +153,6 @@ public class JobClient {
 			boolean sysoutLogUpdates) {
 
 		checkNotNull(jobID, "The jobID must not be null.");
-		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
 		checkNotNull(configuration, "The configuration must not be null.");
 		checkNotNull(actorSystem, "The actorSystem must not be null.");
 		checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
@@ -193,36 +193,37 @@ public class JobClient {
 	 * @throws JobRetrievalException if anything goes wrong
 	 */
 	public static ClassLoader retrieveClassLoader(
-		JobID jobID,
-		ActorGateway jobManager,
-		Configuration config,
-		HighAvailabilityServices highAvailabilityServices)
+			JobID jobID,
+			JobManagerGateway jobManager,
+			Configuration config,
+			HighAvailabilityServices highAvailabilityServices,
+			Time timeout)
 		throws JobRetrievalException {
 
-		final Object jmAnswer;
+		final CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> clPropsFuture = jobManager
+			.requestClassloadingProps(jobID, timeout);
+
+		final Optional<JobManagerMessages.ClassloadingProps> optProps;
+
 		try {
-			jmAnswer = Await.result(
-				jobManager.ask(
-					new JobManagerMessages.RequestClassloadingProps(jobID),
-					AkkaUtils.getDefaultTimeoutAsFiniteDuration()),
-				AkkaUtils.getDefaultTimeoutAsFiniteDuration());
+			optProps = clPropsFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		} catch (Exception e) {
-			throw new JobRetrievalException(jobID, "Couldn't retrieve class loading properties from JobManager.", e);
+			throw new JobRetrievalException(jobID, "Could not retrieve the class loading properties from JobManager.", e);
 		}
 
-		if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
-			JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer);
+		if (optProps.isPresent()) {
+			JobManagerMessages.ClassloadingProps props = optProps.get();
 
-			Option<String> jmHost = jobManager.actor().path().address().host();
-			String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
-			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
+			InetSocketAddress serverAddress = new InetSocketAddress(jobManager.getHostname(), props.blobManagerPort());
 			final BlobCache blobClient;
 			try {
 				// TODO: Fix lifecycle of BlobCache to properly close it upon usage
 				blobClient = new BlobCache(serverAddress, config, highAvailabilityServices.createBlobStore());
 			} catch (IOException e) {
-				throw new JobRetrievalException(jobID,
-					"Failed to setup blob cache", e);
+				throw new JobRetrievalException(
+					jobID,
+					"Failed to setup BlobCache.",
+					e);
 			}
 
 			final Collection<BlobKey> requiredJarFiles = props.requiredJarFiles();
@@ -250,10 +251,8 @@ public class JobClient {
 			}
 
 			return new FlinkUserCodeClassLoader(allURLs, JobClient.class.getClassLoader());
-		} else if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
-			throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found");
 		} else {
-			throw new JobRetrievalException(jobID, "Unknown response from JobManager: " + jmAnswer);
+			throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found");
 		}
 	}
 
@@ -407,13 +406,13 @@ public class JobClient {
 	 * @param jobManagerGateway Gateway to the JobManager which will execute the jobs
 	 * @param config The cluster wide configuration.
 	 * @param jobGraph The job
-	 * @param timeout  Timeout in which the JobManager must have responded.
+	 * @param timeout Timeout in which the JobManager must have responded.
 	 */
 	public static void submitJobDetached(
-			ActorGateway jobManagerGateway,
+			JobManagerGateway jobManagerGateway,
 			Configuration config,
 			JobGraph jobGraph,
-			FiniteDuration timeout,
+			Time timeout,
 			ClassLoader classLoader) throws JobExecutionException {
 
 		checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
@@ -427,7 +426,7 @@ public class JobClient {
 		final InetSocketAddress blobServerAddress;
 
 		try {
-			blobServerAddress = blobServerAddressFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+			blobServerAddress = blobServerAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		} catch (Exception e) {
 			throw new JobSubmissionException(jobGraph.getJobID(), "Could not retrieve BlobServer address.", e);
 		}
@@ -440,52 +439,27 @@ public class JobClient {
 				"Could not upload the program's JAR files to the JobManager.", e);
 		}
 
-		Object result;
+		CompletableFuture<Acknowledge> submissionFuture = jobManagerGateway.submitJob(jobGraph, ListeningBehaviour.DETACHED, timeout);
+
 		try {
-			Future<Object> future = jobManagerGateway.ask(
-				new JobManagerMessages.SubmitJob(
-					jobGraph,
-					ListeningBehaviour.DETACHED // only receive the Acknowledge for the job submission message
-				),
-				timeout);
-
-			result = Await.result(future, timeout);
-		}
-		catch (TimeoutException e) {
+			submissionFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		} catch (TimeoutException e) {
 			throw new JobTimeoutException(jobGraph.getJobID(),
-					"JobManager did not respond within " + timeout.toString(), e);
-		}
-		catch (Throwable t) {
-			throw new JobSubmissionException(jobGraph.getJobID(),
-					"Failed to send job to JobManager: " + t.getMessage(), t.getCause());
-		}
+				"JobManager did not respond within " + timeout, e);
+		} catch (Throwable throwable) {
+			Throwable stripped = ExceptionUtils.stripExecutionException(throwable);
 
-		if (result instanceof JobManagerMessages.JobSubmitSuccess) {
-			JobID respondedID = ((JobManagerMessages.JobSubmitSuccess) result).jobId();
-
-			// validate response
-			if (!respondedID.equals(jobGraph.getJobID())) {
-				throw new JobExecutionException(jobGraph.getJobID(),
-						"JobManager responded for wrong Job. This Job: " +
-						jobGraph.getJobID() + ", response: " + respondedID);
-			}
-		}
-		else if (result instanceof JobManagerMessages.JobResultFailure) {
 			try {
-				SerializedThrowable t = ((JobManagerMessages.JobResultFailure) result).cause();
-				throw t.deserializeError(classLoader);
-			}
-			catch (JobExecutionException e) {
-				throw e;
-			}
-			catch (Throwable t) {
-				throw new JobExecutionException(jobGraph.getJobID(),
-						"JobSubmission failed: " + t.getMessage(), t);
+				ExceptionUtils.tryDeserializeAndThrow(stripped, classLoader);
+			} catch (JobExecutionException jee) {
+				throw jee;
+			} catch (Throwable t) {
+				throw new JobExecutionException(
+					jobGraph.getJobID(),
+					"JobSubmission failed.",
+					t);
 			}
 		}
-		else {
-			throw new JobExecutionException(jobGraph.getJobID(), "Unexpected response from JobManager: " + result);
-		}
 	}
 
 	/**
@@ -496,16 +470,12 @@ public class JobClient {
 	 * @return CompletableFuture containing the BlobServer address
 	 */
 	public static CompletableFuture<InetSocketAddress> retrieveBlobServerAddress(
-			ActorGateway jobManagerGateway,
-			FiniteDuration timeout) {
+			JobManagerGateway jobManagerGateway,
+			Time timeout) {
 
-		CompletableFuture<Integer> futureBlobPort = FutureUtils.toJava(
-			jobManagerGateway
-				.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout)
-				.mapTo(ClassTag$.MODULE$.apply(Integer.class)));
+		CompletableFuture<Integer> futureBlobPort = jobManagerGateway.requestBlobServerPort(timeout);
 
-		final Option<String> jmHost = jobManagerGateway.actor().path().address().host();
-		final String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
+		final String jmHostname = jobManagerGateway.getHostname();
 
 		return futureBlobPort.thenApply(
 			(Integer blobPort) -> new InetSocketAddress(jmHostname, blobPort));

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index 793041f..ccf8b49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobClientMessages.JobManagerActorRef;
 import org.apache.flink.runtime.messages.JobClientMessages.JobManagerLeaderAddress;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.FiniteDuration;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
index bb448be..eb045c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.client;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -136,9 +138,10 @@ public final class JobListeningContext {
 			// lazily initializes the class loader when it is needed
 			classLoader = JobClient.retrieveClassLoader(
 				jobID,
-				getJobManager(),
+				new AkkaJobManagerGateway(getJobManager()),
 				configuration,
-				highAvailabilityServices);
+				highAvailabilityServices,
+				Time.milliseconds(timeout.toMillis()));
 			LOG.info("Reconstructed class loader for Job {}", jobID);
 		}
 		return classLoader;

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index 7d9f452..4ca6e8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -22,8 +22,11 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.dispatch.Futures;
+
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -32,7 +35,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -143,11 +146,14 @@ public class JobSubmissionClientActor extends JobClientActor {
 		Futures.future(new Callable<Object>() {
 			@Override
 			public Object call() throws Exception {
-				ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
+				final ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
+				final AkkaJobManagerGateway akkaJobManagerGateway = new AkkaJobManagerGateway(jobManagerGateway);
 
 				LOG.info("Upload jar files to job manager {}.", jobManager.path());
 
-				final CompletableFuture<InetSocketAddress> blobServerAddressFuture = JobClient.retrieveBlobServerAddress(jobManagerGateway, timeout);
+				final CompletableFuture<InetSocketAddress> blobServerAddressFuture = JobClient.retrieveBlobServerAddress(
+					akkaJobManagerGateway,
+					Time.milliseconds(timeout.toMillis()));
 				final InetSocketAddress blobServerAddress;
 
 				try {

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
index f91f3fe..ae6bb39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.clusterframework.messages;
 
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index e0a9a0b..043c603 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.concurrent;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.util.Preconditions;
 
 import akka.dispatch.OnComplete;
@@ -29,10 +30,12 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -292,6 +295,16 @@ public class FutureUtils {
 		return result;
 	}
 
+	/**
+	 * Converts Flink time into a {@link FiniteDuration}.
+	 *
+	 * @param time to convert into a FiniteDuration
+	 * @return FiniteDuration with the length of the given time
+	 */
+	public static FiniteDuration toFiniteDuration(Time time) {
+		return new FiniteDuration(time.toMilliseconds(), TimeUnit.MILLISECONDS);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Converting futures
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ae9b5f1..139f484 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -63,7 +63,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
index 01f1e75..ea69c44 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 
 import java.util.UUID;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
index 934234d..0ffac2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 
 /**
  * Network-stack level Exception to notify remote receiver about a failed

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f0327a3..1c68515 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -524,8 +524,8 @@ public class JobGraph implements Serializable {
 
 	/**
 	 * Uploads the previously added user JAR files to the job manager through
-	 * the job manager's BLOB server. The respective port is retrieved from the
-	 * JobManager. This function issues a blocking call.
+	 * the job manager's BLOB server. The BLOB servers' address is given as a
+	 * parameter. This function issues a blocking call.
 	 *
 	 * @param blobServerAddress of the blob server to upload the jars to
 	 * @param blobClientConfig the blob client configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
new file mode 100644
index 0000000..cba7b06
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Public JobManager gateway.
+ *
+ * <p>This interface constitutes the operations an external component can
+ * trigger on the JobManager.
+ */
+public interface JobManagerGateway extends RpcGateway {
+
+	/**
+	 * Requests the class loading properties for the given JobID.
+	 *
+	 * @param jobId for which the class loading properties are requested
+	 * @param timeout for this operation
+	 * @return Future containing the optional class loading properties if they could be retrieved from the JobManager.
+	 */
+	CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout);
+
+	/**
+	 * Requests the BlobServer port.
+	 *
+	 * @param timeout for this operation
+	 * @return Future containing the BlobServer port
+	 */
+	CompletableFuture<Integer> requestBlobServerPort(Time timeout);
+
+	/**
+	 * Submits a job to the JobManager.
+	 *
+	 * @param jobGraph to submit
+	 * @param listeningBehaviour of the client
+	 * @param timeout for this operation
+	 * @return Future containing an Acknowledge message if the submission succeeded
+	 */
+	CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e2e117a..cdae89f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -91,7 +91,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
index 830b751..7b0b55c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheck
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 
 /**
  * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 9395435..bd66aa0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
deleted file mode 100644
index 63f4363..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.io.Serializable;
-import java.lang.ref.WeakReference;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Utility class for dealing with user-defined Throwable types that are serialized (for
- * example during RPC/Actor communication), but cannot be resolved with the default
- * class loader.
- * 
- * <p>This exception mimics the original exception with respect to message and stack trace,
- * and contains the original exception in serialized form. The original exception
- * can be re-obtained by supplying the appropriate class loader.
- */
-public class SerializedThrowable extends Exception implements Serializable {
-	
-	private static final long serialVersionUID = 7284183123441947635L;
-	
-	/** The original exception in serialized form */
-	private final byte[] serializedException;
-	
-	/** Name of the original error class */
-	private final String originalErrorClassName;
-	
-	/** The original stack trace, to be printed */
-	private final String fullStringifiedStackTrace;
-
-	/** The original exception, not transported via serialization, 
-	 * because the class may not be part of the system class loader.
-	 * In addition, we make sure our cached references to not prevent
-	 * unloading the exception class. */
-	private transient WeakReference<Throwable> cachedException;
-
-
-	/**
-	 * Create a new SerializedThrowable.
-	 * 
-	 * @param exception The exception to serialize.
-	 */
-	public SerializedThrowable(Throwable exception) {
-		this(exception, new HashSet<Throwable>());
-	}
-
-	private SerializedThrowable(Throwable exception, Set<Throwable> alreadySeen) {
-		super(getMessageOrError(exception));
-
-		if (!(exception instanceof SerializedThrowable)) {
-			// serialize and memoize the original message
-			byte[] serialized;
-			try {
-				serialized = InstantiationUtil.serializeObject(exception);
-			}
-			catch (Throwable t) {
-				serialized = null;
-			}
-			this.serializedException = serialized;
-			this.cachedException = new WeakReference<Throwable>(exception);
-
-			// record the original exception's properties (name, stack prints)
-			this.originalErrorClassName = exception.getClass().getName();
-			this.fullStringifiedStackTrace = ExceptionUtils.stringifyException(exception);
-
-			// mimic the original exception's stack trace
-			setStackTrace(exception.getStackTrace());
-
-			// mimic the original exception's cause
-			if (exception.getCause() == null) {
-				initCause(null);
-			}
-			else {
-				// exception causes may by cyclic, so we truncate the cycle when we find it 
-				if (alreadySeen.add(exception)) {
-					// we are not in a cycle, yet
-					initCause(new SerializedThrowable(exception.getCause(), alreadySeen));
-				}
-			}
-
-		}
-		else {
-			// copy from that serialized throwable
-			SerializedThrowable other = (SerializedThrowable) exception;
-			this.serializedException = other.serializedException;
-			this.originalErrorClassName = other.originalErrorClassName;
-			this.fullStringifiedStackTrace = other.fullStringifiedStackTrace;
-			this.cachedException = other.cachedException;
-			this.setStackTrace(other.getStackTrace());
-			this.initCause(other.getCause());
-		}
-	}
-
-	public Throwable deserializeError(ClassLoader classloader) {
-		if (serializedException == null) {
-			// failed to serialize the original exception
-			// return this SerializedThrowable as a stand in
-			return this;
-		}
-
-		Throwable cached = cachedException == null ? null : cachedException.get();
-		if (cached == null) {
-			try {
-				cached = InstantiationUtil.deserializeObject(serializedException, classloader);
-				cachedException = new WeakReference<Throwable>(cached);
-			}
-			catch (Throwable t) {
-				// something went wrong
-				// return this SerializedThrowable as a stand in
-				return this;
-			}
-		}
-		return cached;
-	}
-
-	public String getOriginalErrorClassName() {
-		return originalErrorClassName;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Override the behavior of Throwable
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void printStackTrace(PrintStream s) {
-		s.print(fullStringifiedStackTrace);
-		s.flush();
-	}
-	
-	@Override
-	public void printStackTrace(PrintWriter s) {
-		s.print(fullStringifiedStackTrace);
-		s.flush();
-	}
-	
-	@Override
-	public String toString() {
-		String message = getLocalizedMessage();
-		return (message != null) ? (originalErrorClassName + ": " + message) : originalErrorClassName;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Static utilities
-	// ------------------------------------------------------------------------
-
-	public static Throwable get(Throwable serThrowable, ClassLoader loader) {
-		if (serThrowable instanceof SerializedThrowable) {
-			return ((SerializedThrowable)serThrowable).deserializeError(loader);
-		} else {
-			return serThrowable;
-		}
-	}
-
-	private static String getMessageOrError(Throwable error) {
-		try {
-			return error.getMessage();
-		}
-		catch (Throwable t) {
-			return "(failed to get message)";
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 80fa506..fc668d6 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -84,7 +84,7 @@ import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.{InstantiationUtil, NetUtils}
+import org.apache.flink.util.{InstantiationUtil, NetUtils, SerializedThrowable}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 4db2584..52f3777 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID
 import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, JobStatus, JobVertexID}
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph
 import org.apache.flink.runtime.messages.checkpoint.AbstractCheckpointMessage
-import org.apache.flink.runtime.util.SerializedThrowable
+import org.apache.flink.util.SerializedThrowable
 
 import scala.collection.JavaConverters._
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index f0a96ca..c5c87ac 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -26,10 +26,11 @@ import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
 import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, TaskManagerOptions}
 import org.apache.flink.core.fs.Path
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
 import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader
@@ -582,10 +583,11 @@ abstract class FlinkMiniCluster(
           e)
       }
 
-    JobClient.submitJobDetached(jobManagerGateway,
+    JobClient.submitJobDetached(
+      new AkkaJobManagerGateway(jobManagerGateway),
       configuration,
       jobGraph,
-      timeout,
+      Time.milliseconds(timeout.toMillis),
       userCodeClassLoader)
 
     new JobSubmissionResult(jobGraph.getJobID)

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
index ca2de0c..6bff0f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
index 45988f5..0f309b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemoryUtils;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedThrowable;
 
 import org.junit.Test;