You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/22 08:33:17 UTC

[1/2] flink git commit: [FLINK-6629] Use HAServices to find connecting address for ClusterClient's ActorSystem

Repository: flink
Updated Branches:
  refs/heads/master 392bc7130 -> d24651596


[FLINK-6629] Use HAServices to find connecting address for ClusterClient's ActorSystem

The ClusterClient starts its ActorSystem lazily. In order to find out the address
to which to bind, the ClusterClient tries to connect to the JobManager. In order
to find out the JobManager's address it is important to use the
HighAvailabilityServices instead of retrieving the address information from the
configuration, because otherwise it conflicts with HA mode.

This closes #3949.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2465159
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2465159
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2465159

Branch: refs/heads/master
Commit: d246515963e6f736dc78114ae1dbecbbcd93ed32
Parents: e397961
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri May 19 14:31:19 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 22 10:32:45 2017 +0200

----------------------------------------------------------------------
 flink-clients/pom.xml                           |   8 ++
 .../flink/client/program/ClusterClient.java     | 110 +++++++++++++------
 .../client/program/ClientConnectionTest.java    |  52 +++++++++
 .../runtime/util/LeaderRetrievalUtils.java      |   4 +-
 .../RemoteEnvironmentITCase.java                |   8 +-
 .../apache/flink/yarn/YarnClusterClient.java    |  28 +++--
 6 files changed, 165 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d2465159/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index deac7fc..5995ea8 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -76,6 +76,14 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
 		
 		<dependency>
 			<groupId>com.data-artisans</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/d2465159/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index e09a0b6..e7314eb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.optimizer.CompilerException;
@@ -54,7 +54,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
 import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
-import org.apache.flink.runtime.net.ConnectionUtils;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.FlinkException;
@@ -62,13 +61,12 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Some;
+import scala.Option;
 import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
@@ -151,7 +149,11 @@ public abstract class ClusterClient {
 		this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
 		this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
 
-		this.actorSystemLoader = new LazyActorSystemLoader(flinkConfig, LOG);
+		this.actorSystemLoader = new LazyActorSystemLoader(
+			highAvailabilityServices,
+			Time.milliseconds(lookupTimeout.toMillis()),
+			flinkConfig,
+			LOG);
 
 		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
 	}
@@ -164,13 +166,23 @@ public abstract class ClusterClient {
 
 		private final Logger LOG;
 
-		private final Configuration flinkConfig;
+		private final HighAvailabilityServices highAvailabilityServices;
+
+		private final Time timeout;
+
+		private final Configuration configuration;
 
 		private ActorSystem actorSystem;
 
-		private LazyActorSystemLoader(Configuration flinkConfig, Logger LOG) {
-			this.flinkConfig = flinkConfig;
-			this.LOG = LOG;
+		private LazyActorSystemLoader(
+				HighAvailabilityServices highAvailabilityServices,
+				Time timeout,
+				Configuration configuration,
+				Logger LOG) {
+			this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
+			this.timeout = Preconditions.checkNotNull(timeout);
+			this.configuration = Preconditions.checkNotNull(configuration);
+			this.LOG = Preconditions.checkNotNull(LOG);
 		}
 
 		/**
@@ -192,30 +204,31 @@ public abstract class ClusterClient {
 		/**
 		 * Creates a new ActorSystem or returns an existing one.
 		 * @return ActorSystem
+		 * @throws Exception if the ActorSystem could not be created
 		 */
-		public ActorSystem get() {
+		public ActorSystem get() throws FlinkException {
 
 			if (!isLoaded()) {
 				// start actor system
 				LOG.info("Starting client actor system.");
 
-				String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-				int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-				if (hostName == null || port == -1) {
-					throw new RuntimeException("The initial JobManager address has not been set correctly.");
+				final InetAddress ownHostname;
+				try {
+					ownHostname = LeaderRetrievalUtils.findConnectingAddress(
+						highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+						timeout);
+				} catch (LeaderRetrievalException lre) {
+					throw new FlinkException("Could not find out our own hostname by connecting to the " +
+						"leading JobManager. Please make sure that the Flink cluster has been started.", lre);
 				}
-				InetSocketAddress initialJobManagerAddress = new InetSocketAddress(hostName, port);
 
-				// find name of own public interface, able to connect to the JM
-				// try to find address for 2 seconds. log after 400 ms.
-				InetAddress ownHostname;
 				try {
-					ownHostname = ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000, 400);
-				} catch (IOException e) {
-					throw new RuntimeException("Failed to resolve JobManager address at " + initialJobManagerAddress, e);
+					actorSystem = AkkaUtils.createActorSystem(
+						configuration,
+						Option.apply(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(), 0)));
+				} catch (Exception e) {
+					throw new FlinkException("Could not start the ActorSystem lazily.", e);
 				}
-				actorSystem = AkkaUtils.createActorSystem(flinkConfig,
-					new Some<>(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(), 0)));
 			}
 
 			return actorSystem;
@@ -440,10 +453,19 @@ public abstract class ClusterClient {
 
 		waitForClusterToBeReady();
 
+		final ActorSystem actorSystem;
+
+		try {
+			actorSystem = actorSystemLoader.get();
+		} catch (FlinkException fe) {
+			throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to the " +
+				"JobManager.", fe);
+		}
+
 		try {
 			logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
 			this.lastJobExecutionResult = JobClient.submitJobAndWait(
-				actorSystemLoader.get(),
+				actorSystem,
 				flinkConfig,
 				highAvailabilityServices,
 				jobGraph,
@@ -451,7 +473,7 @@ public abstract class ClusterClient {
 				printStatusDuringExecution,
 				classLoader);
 
-			return this.lastJobExecutionResult;
+			return lastJobExecutionResult;
 		} catch (JobExecutionException e) {
 			throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
 		}
@@ -491,6 +513,17 @@ public abstract class ClusterClient {
 	 * @throws JobExecutionException if an error occurs during monitoring the job execution
 	 */
 	public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException {
+		final ActorSystem actorSystem;
+
+		try {
+			actorSystem = actorSystemLoader.get();
+		} catch (FlinkException fe) {
+			throw new JobExecutionException(
+				jobID,
+				"Could not start the ActorSystem needed to talk to the JobManager.",
+				fe);
+		}
+
 		ActorGateway jobManagerGateway;
 		try {
 			jobManagerGateway = getJobManagerGateway();
@@ -502,7 +535,7 @@ public abstract class ClusterClient {
 			jobID,
 			jobManagerGateway,
 			flinkConfig,
-			actorSystemLoader.get(),
+			actorSystem,
 			highAvailabilityServices,
 			timeout,
 			printStatusDuringExecution);
@@ -518,6 +551,17 @@ public abstract class ClusterClient {
 	 * @throws JobExecutionException if an error occurs during monitoring the job execution
 	 */
 	public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException {
+		final ActorSystem actorSystem;
+
+		try {
+			actorSystem = actorSystemLoader.get();
+		} catch (FlinkException fe) {
+			throw new JobExecutionException(
+				jobID,
+				"Could not start the ActorSystem needed to talk to the JobManager.",
+				fe);
+		}
+
 		ActorGateway jobManagerGateway;
 		try {
 			jobManagerGateway = getJobManagerGateway();
@@ -526,13 +570,13 @@ public abstract class ClusterClient {
 		}
 
 		return JobClient.attachToRunningJob(
-				jobID,
-				jobManagerGateway,
-				flinkConfig,
-				actorSystemLoader.get(),
-				highAvailabilityServices,
-				timeout,
-				printStatusDuringExecution);
+			jobID,
+			jobManagerGateway,
+			flinkConfig,
+			actorSystem,
+			highAvailabilityServices,
+			timeout,
+			printStatusDuringExecution);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/d2465159/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 3bfaa95..246a75c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -18,16 +18,27 @@
 
 package org.apache.flink.client.program;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorTest;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.UUID;
 
 import static org.junit.Assert.*;
 
@@ -98,4 +109,45 @@ public class ClientConnectionTest extends TestLogger {
 			assertTrue(CommonTestUtils.containsCause(e, LeaderRetrievalException.class));
 		}
 	}
+
+	/**
+	 * FLINK-6629
+	 *
+	 * Tests that the {@link HighAvailabilityServices} are respected when initializing the ClusterClient's
+	 * {@link ActorSystem} and retrieving the leading JobManager.
+	 */
+	@Test
+	public void testJobManagerRetrievalWithHAServices() throws Exception {
+		final Configuration configuration = new Configuration();
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+		ActorRef actorRef = null;
+		final UUID leaderId = UUID.randomUUID();
+
+		try {
+			actorRef = actorSystem.actorOf(
+				Props.create(
+					JobClientActorTest.PlainActor.class,
+					leaderId));
+
+			final String expectedAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
+
+			final TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(expectedAddress, leaderId);
+
+			highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, testingLeaderRetrievalService);
+
+			ClusterClient client = new StandaloneClusterClient(configuration, highAvailabilityServices);
+
+			ActorGateway gateway = client.getJobManagerGateway();
+
+			assertEquals(expectedAddress, gateway.path());
+			assertEquals(leaderId, gateway.leaderSessionID());
+		} finally {
+			if (actorRef != null) {
+				TestingUtils.stopActorGracefully(actorRef);
+			}
+
+			actorSystem.shutdown();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d2465159/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 009bec6..6b861a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -137,8 +137,8 @@ public class LeaderRetrievalUtils {
 	}
 
 	public static InetAddress findConnectingAddress(
-		LeaderRetrievalService leaderRetrievalService,
-		Time timeout) throws LeaderRetrievalException {
+			LeaderRetrievalService leaderRetrievalService,
+			Time timeout) throws LeaderRetrievalException {
 		return findConnectingAddress(leaderRetrievalService, new FiniteDuration(timeout.getSize(), timeout.getUnit()));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d2465159/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index 0091571..7c6f73a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -23,11 +23,13 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -73,7 +75,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
 	/**
 	 * Ensure that that Akka configuration parameters can be set.
 	 */
-	@Test(expected=IllegalArgumentException.class)
+	@Test(expected=FlinkException.class)
 	public void testInvalidAkkaConfiguration() throws Throwable {
 		Configuration config = new Configuration();
 		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
@@ -86,11 +88,11 @@ public class RemoteEnvironmentITCase extends TestLogger {
 		env.getConfig().disableSysoutLogging();
 
 		DataSet<String> result = env.createInput(new TestNonRichInputFormat());
-		result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
+		result.output(new LocalCollectionOutputFormat<>(new ArrayList<String>()));
 		try {
 			env.execute();
 			Assert.fail("Program should not run successfully, cause of invalid akka settings.");
-		} catch (IOException ex) {
+		} catch (ProgramInvocationException ex) {
 			throw ex.getCause();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d2465159/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index e70af09..8f47b18 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -19,6 +19,7 @@ package org.apache.flink.yarn;
 
 import akka.actor.ActorRef;
 
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
@@ -34,6 +35,7 @@ import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.conf.Configuration;
@@ -569,17 +571,29 @@ public class YarnClusterClient extends ClusterClient {
 		 * Creates a new ApplicationClient actor or returns an existing one. May start an ActorSystem.
 		 * @return ActorSystem
 		 */
-		public ActorRef get() {
+		public ActorRef get() throws FlinkException {
 			if (applicationClient == null) {
 				// start application client
 				LOG.info("Start application client.");
 
-				applicationClient = actorSystemLoader.get().actorOf(
-					Props.create(
-						ApplicationClient.class,
-						flinkConfig,
-						highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID)),
-					"applicationClient");
+				final ActorSystem actorSystem;
+
+				try {
+					actorSystem = actorSystemLoader.get();
+				} catch (FlinkException fle) {
+					throw new FlinkException("Could not start the ClusterClient's ActorSystem.", fle);
+				}
+
+				try {
+					applicationClient = actorSystem.actorOf(
+						Props.create(
+							ApplicationClient.class,
+							flinkConfig,
+							highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID)),
+						"applicationClient");
+				} catch (Exception e) {
+					throw new FlinkException("Could not start the ApplicationClient.", e);
+				}
 			}
 
 			return applicationClient;


[2/2] flink git commit: [FLINK-6635] [test] Fix ClientConnectionTest

Posted by tr...@apache.org.
[FLINK-6635] [test] Fix ClientConnectionTest

The ClientConnectionTest passed even though it was failing the test because we
were expecting an exception and checking a special word to contained in the
exception's message. Unfortunately, we generated an AssertionError with the same
word if the actual logic we wanted to test failed. That cause the test to pass.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3979616
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3979616
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3979616

Branch: refs/heads/master
Commit: e3979616b4cd40db8f96bd661d52c37fcf84d57c
Parents: 392bc71
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri May 19 12:01:51 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 22 10:32:45 2017 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     | 16 +++-
 .../client/program/StandaloneClusterClient.java |  4 +-
 .../client/program/ClientConnectionTest.java    | 93 ++++----------------
 .../runtime/util/LeaderRetrievalUtils.java      |  2 +-
 .../flink/core/testutils/CommonTestUtils.java   | 22 +++++
 5 files changed, 56 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3979616/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index b081721..e09a0b6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
@@ -56,6 +57,7 @@ import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
 import org.apache.flink.runtime.net.ConnectionUtils;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
@@ -733,10 +735,16 @@ public abstract class ClusterClient {
 	 */
 	public ActorGateway getJobManagerGateway() throws Exception {
 		LOG.debug("Looking up JobManager");
-		return LeaderRetrievalUtils.retrieveLeaderGateway(
-			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-			actorSystemLoader.get(),
-			lookupTimeout);
+
+		try {
+			return LeaderRetrievalUtils.retrieveLeaderGateway(
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+				actorSystemLoader.get(),
+				lookupTimeout);
+		} catch (LeaderRetrievalException lre) {
+			throw new FlinkException("Could not connect to the leading JobManager. Please check that the " +
+				"JobManager is running.", lre);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e3979616/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 7517504..b00e519 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -52,7 +52,7 @@ public class StandaloneClusterClient extends ClusterClient {
 
 	@Override
 	public String getWebInterfaceURL() {
-		String host = this.getJobManagerAddress().getHostString();
+		String host = getJobManagerAddress().getHostString();
 		int port = getFlinkConfiguration().getInteger(JobManagerOptions.WEB_PORT);
 		return "http://" +  host + ":" + port;
 	}
@@ -70,7 +70,7 @@ public class StandaloneClusterClient extends ClusterClient {
 				throw new RuntimeException("Received the wrong reply " + result + " from cluster.");
 			}
 		} catch (Exception e) {
-			throw new RuntimeException("Couldn't retrieve the Cluster status.", e);
+			throw new RuntimeException("Couldn't retrieve the cluster status.", e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3979616/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index fc24a9d..3bfaa95 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -20,14 +20,14 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 
@@ -35,17 +35,16 @@ import static org.junit.Assert.*;
  * This test starts a job client without the JobManager being reachable. It
  * tests for a timely error and a meaningful error message.
  */
-public class ClientConnectionTest {
+public class ClientConnectionTest extends TestLogger {
 
-	private static final long CONNECT_TIMEOUT = 2 * 1000; // 2 seconds
-	private static final long ASK_STARTUP_TIMEOUT = 100 * 1000; // 100 seconds
-	private static final long MAX_DELAY = 50 * 1000; // less than the startup timeout
+	private static final long CONNECT_TIMEOUT = 100L; // 100 ms
+	private static final long ASK_STARTUP_TIMEOUT = 20000L; // 10 seconds
 
 	/**
 	 * Tests the behavior against a LOCAL address where no job manager is running.
 	 */
 	@Test
-	public void testExceptionWhenLocalJobManagerUnreachablelocal() {
+	public void testExceptionWhenLocalJobManagerUnreachablelocal() throws Exception {
 
 		final InetSocketAddress unreachableEndpoint;
 		try {
@@ -64,7 +63,7 @@ public class ClientConnectionTest {
 	 * Tests the behavior against a REMOTE address where no job manager is running.
 	 */
 	@Test
-	public void testExceptionWhenRemoteJobManagerUnreachable() {
+	public void testExceptionWhenRemoteJobManagerUnreachable() throws Exception {
 
 		final InetSocketAddress unreachableEndpoint;
 		try {
@@ -79,78 +78,24 @@ public class ClientConnectionTest {
 		testFailureBehavior(unreachableEndpoint);
 	}
 
-	private void testFailureBehavior(final InetSocketAddress unreachableEndpoint) {
+	private static void testFailureBehavior(final InetSocketAddress unreachableEndpoint) throws Exception {
 
 		final Configuration config = new Configuration();
-		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, (ASK_STARTUP_TIMEOUT/1000) + " s");
-		config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, (CONNECT_TIMEOUT/1000) + " s");
+		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, (ASK_STARTUP_TIMEOUT) + " ms");
+		config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, (CONNECT_TIMEOUT) + " ms");
 		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, unreachableEndpoint.getHostName());
 		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, unreachableEndpoint.getPort());
 
+		ClusterClient client = new StandaloneClusterClient(config);
 
 		try {
-			JobVertex vertex = new JobVertex("Test Vertex");
-			vertex.setInvokableClass(TestInvokable.class);
-
-			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
-			Thread invoker = new Thread("test invoker") {
-				@Override
-				public void run() {
-					try {
-						new StandaloneClusterClient(config);
-						fail("This should fail with an exception since the JobManager is unreachable.");
-					}
-					catch (Throwable t) {
-						synchronized (error) {
-							error.set(t);
-							error.notifyAll();
-						}
-					}
-				}
-			};
-
-			invoker.setDaemon(true);
-			invoker.start();
-
-			try {
-				// wait until the caller is successful, for at most the given time
-				long now = System.nanoTime();
-				long deadline = now + MAX_DELAY * 1_000_000;
-
-				synchronized (error) {
-					while (invoker.isAlive() && error.get() == null && now < deadline) {
-						error.wait(1000);
-						now = System.nanoTime();
-					}
-				}
-
-				Throwable t = error.get();
-				if (t == null) {
-					fail("Job invocation did not fail in expected time interval.");
-				}
-				else {
-					assertNotNull(t.getMessage());
-					assertTrue(t.getMessage(), t.getMessage().contains("JobManager"));
-				}
-			}
-			finally {
-				if (invoker.isAlive()) {
-					invoker.interrupt();
-				}
-			}
+			// we have to query the cluster status to start the connection attempts
+			client.getClusterStatus();
+			fail("This should fail with an exception since the endpoint is unreachable.");
+		} catch (Exception e) {
+			// check that we have failed with a LeaderRetrievalException which says that we could
+			// not connect to the leading JobManager
+			assertTrue(CommonTestUtils.containsCause(e, LeaderRetrievalException.class));
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public static class TestInvokable extends AbstractInvokable {
-
-		@Override
-		public void invoke() {}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3979616/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 073c52b..009bec6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -76,7 +76,7 @@ public class LeaderRetrievalUtils {
 
 			return Await.result(actorGatewayFuture, timeout);
 		} catch (Exception e) {
-			throw new LeaderRetrievalException("Could not retrieve the leader gateway", e);
+			throw new LeaderRetrievalException("Could not retrieve the leader gateway.", e);
 		} finally {
 			try {
 				leaderRetrievalService.stop();

http://git-wip-us.apache.org/repos/asf/flink/blob/e3979616/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index cf2bb7f..33811f2 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -302,4 +302,26 @@ public class CommonTestUtils {
 			throw new RuntimeException("Unclassified error while trying to access the sun.misc.Unsafe handle.", t);
 		}
 	}
+
+	/**
+	 * Checks whether the given throwable contains the given cause as a cause. The cause is not checked
+	 * on equality but on type equality.
+	 *
+	 * @param throwable Throwable to check for the cause
+	 * @param cause Cause to look for
+	 * @return True if the given Throwable contains the given cause (type equality); otherwise false
+	 */
+	public static boolean containsCause(Throwable throwable, Class<? extends Throwable> cause) {
+		Throwable current = throwable;
+
+		while (current != null) {
+			if (cause.isAssignableFrom(current.getClass())) {
+				return true;
+			}
+
+			current = current.getCause();
+		}
+
+		return false;
+	}
 }