You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/26 14:41:42 UTC

[16/16] flink git commit: [FLINK-8344][flip6] Retrieve leading WebMonitor in RestClusterClient

[FLINK-8344][flip6] Retrieve leading WebMonitor in RestClusterClient

Make WebMonitorEndpoint instances participate in leader election.
Use leading instance's base url to issue HTTP request from RestClusterClient.
Make polling of JobExecutionResults and savepoints fault tolerant.

[FLINK-8344][flip6] Add TestLogger to unit tests

[FLINK-8344][flip6] Update RestOptions

Declare timeouts and delays as long datatype.
Add descriptions to ConfigOptions.

[FLINK-8344][flip6] Rename methods in RestClusterClient

Rename waitForSavepointCompletion to pollSavepointAsync.
Rename waitForResource to pollResourceAsync.

This closes #5312.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac8225fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac8225fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac8225fd

Branch: refs/heads/master
Commit: ac8225fd56f16b1766724aefbd44babbe322d2ac
Parents: d33aed3
Author: gyao <ga...@data-artisans.com>
Authored: Thu Jan 18 18:17:40 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 26 13:50:25 2018 +0100

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  | 322 ++++++++++++++-----
 .../rest/RestClusterClientConfiguration.java    |  65 ++--
 .../RestClusterClientConfigurationTest.java     |  52 +++
 .../program/rest/RestClusterClientTest.java     | 140 ++++++--
 .../apache/flink/configuration/RestOptions.java |  44 ++-
 .../org/apache/flink/util/ExceptionUtils.java   |   4 +-
 .../flink/util/function/CheckedSupplier.java    |  39 +++
 .../apache/flink/util/ExceptionUtilsTest.java   |   8 +
 .../flink/docs/rest/RestAPIDocGenerator.java    |  38 ++-
 .../webmonitor/RuntimeMonitorHandler.java       |   2 +-
 .../flink/runtime/concurrent/FutureUtils.java   |  40 ++-
 .../dispatcher/DispatcherRestEndpoint.java      |  22 +-
 .../entrypoint/JobClusterEntrypoint.java        |  11 +-
 .../entrypoint/SessionClusterEntrypoint.java    |  19 +-
 .../HighAvailabilityServices.java               |   4 +
 .../HighAvailabilityServicesUtils.java          |  10 +-
 .../nonha/embedded/EmbeddedHaServices.java      |  16 +
 .../nonha/standalone/StandaloneHaServices.java  |  32 +-
 .../zookeeper/ZooKeeperHaServices.java          |  16 +-
 .../jobmaster/JobMasterRestEndpoint.java        |  20 +-
 .../ZooKeeperLeaderElectionService.java         |   7 +
 .../LeaderRetrievalListener.java                |   4 +-
 .../apache/flink/runtime/rest/RestClient.java   |  22 +-
 .../runtime/rest/RestClientConfiguration.java   |  17 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  55 +++-
 .../runtime/concurrent/FutureUtilsTest.java     |  37 +++
 .../TestingHighAvailabilityServices.java        |  22 ++
 .../TestingManualHighAvailabilityServices.java  |  13 +
 .../standalone/StandaloneHaServicesTest.java    |   4 +-
 .../flink/runtime/rest/RestClientTest.java      | 103 ++++++
 .../flink/runtime/rest/RestEndpointITCase.java  |  18 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |   6 +-
 .../YarnIntraNonHaMasterServices.java           |  22 ++
 .../YarnPreConfiguredMasterNonHaServices.java   |  22 ++
 34 files changed, 1053 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 348e647..141af71 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -34,18 +34,25 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
 import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
@@ -61,29 +68,42 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerReq
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
 import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
 import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
-import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.CheckedSupplier;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
+
+import akka.actor.AddressFromURIString;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import scala.Option;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * A {@link ClusterClient} implementation that communicates via HTTP REST requests.
@@ -100,20 +120,48 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 
 	private final T clusterId;
 
+	private final LeaderRetrievalService webMonitorRetrievalService;
+
+	private final LeaderRetrievalService dispatcherRetrievalService;
+
+	private final LeaderRetriever webMonitorLeaderRetriever = new LeaderRetriever();
+
+	private final LeaderRetriever dispatcherLeaderRetriever = new LeaderRetriever();
+
+	/** ExecutorService to run operations that can be retried on exceptions. */
+	private ScheduledExecutorService retryExecutorService;
+
 	public RestClusterClient(Configuration config, T clusterId) throws Exception {
 		this(
 			config,
+			null,
 			clusterId,
 			new ExponentialWaitStrategy(10L, 2000L));
 	}
 
 	@VisibleForTesting
-	RestClusterClient(Configuration configuration, T clusterId, WaitStrategy waitStrategy) throws Exception {
+	RestClusterClient(Configuration configuration, @Nullable RestClient restClient, T clusterId, WaitStrategy waitStrategy) throws Exception {
 		super(configuration);
 		this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration);
-		this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), executorService);
+
+		if (restClient != null) {
+			this.restClient = restClient;
+		} else {
+			this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), executorService);
+		}
+
 		this.waitStrategy = Preconditions.checkNotNull(waitStrategy);
 		this.clusterId = Preconditions.checkNotNull(clusterId);
+
+		this.webMonitorRetrievalService = highAvailabilityServices.getWebMonitorLeaderRetriever();
+		this.dispatcherRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
+		this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
+		startLeaderRetrievers();
+	}
+
+	private void startLeaderRetrievers() throws Exception {
+		this.webMonitorRetrievalService.start(webMonitorLeaderRetriever);
+		this.dispatcherRetrievalService.start(dispatcherLeaderRetriever);
 	}
 
 	@Override
@@ -124,8 +172,23 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 		} catch (Exception e) {
 			log.error("An error occurred during the client shutdown.", e);
 		}
+
+		ExecutorUtils.gracefulShutdown(restClusterClientConfiguration.getRetryDelay(), TimeUnit.MILLISECONDS, retryExecutorService);
+
 		this.restClient.shutdown(Time.seconds(5));
 		ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService);
+
+		try {
+			webMonitorRetrievalService.stop();
+		} catch (Exception e) {
+			log.error("An error occurred during stopping the webMonitorRetrievalService", e);
+		}
+
+		try {
+			dispatcherRetrievalService.stop();
+		} catch (Exception e) {
+			log.error("An error occurred during stopping the dispatcherLeaderRetriever", e);
+		}
 	}
 
 	@Override
@@ -141,16 +204,16 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 
 		final JobResult jobExecutionResult;
 		try {
-			jobExecutionResult = waitForResource(
+			jobExecutionResult = pollResourceAsync(
 				() -> {
 					final JobMessageParameters messageParameters = new JobMessageParameters();
 					messageParameters.jobPathParameter.resolve(jobGraph.getJobID());
-					return restClient.sendRequest(
-						restClusterClientConfiguration.getRestServerAddress(),
-						restClusterClientConfiguration.getRestServerPort(),
+					return sendRetryableRequest(
 						JobExecutionResultHeaders.getInstance(),
-						messageParameters);
-				});
+						messageParameters,
+						EmptyRequestBody.getInstance(),
+						isConnectionProblemException().or(isHttpStatusUnsuccessfulException()));
+				}).get();
 		} catch (final Exception e) {
 			throw new ProgramInvocationException(e);
 		}
@@ -180,9 +243,7 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 		log.info("Requesting blob server port.");
 		int blobServerPort;
 		try {
-			CompletableFuture<BlobServerPortResponseBody> portFuture = restClient.sendRequest(
-				restClusterClientConfiguration.getRestServerAddress(),
-				restClusterClientConfiguration.getRestServerPort(),
+			CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(
 				BlobServerPortHeaders.getInstance());
 			blobServerPort = portFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS).port;
 		} catch (Exception e) {
@@ -191,7 +252,7 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 
 		log.info("Uploading jar files.");
 		try {
-			InetSocketAddress address = new InetSocketAddress(restClusterClientConfiguration.getBlobServerAddress(), blobServerPort);
+			InetSocketAddress address = new InetSocketAddress(getDispatcherAddress().get(), blobServerPort);
 			List<PermanentBlobKey> keys = BlobClient.uploadJarFiles(address, this.flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
 			for (PermanentBlobKey key : keys) {
 				jobGraph.addBlob(key);
@@ -202,9 +263,7 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 
 		log.info("Submitting job graph.");
 		try {
-			CompletableFuture<JobSubmitResponseBody> responseFuture = restClient.sendRequest(
-				restClusterClientConfiguration.getRestServerAddress(),
-				restClusterClientConfiguration.getRestServerPort(),
+			CompletableFuture<JobSubmitResponseBody> responseFuture = sendRequest(
 				JobSubmitHeaders.getInstance(),
 				new JobSubmitRequestBody(jobGraph));
 			responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
@@ -218,9 +277,7 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 		JobTerminationMessageParameters params = new JobTerminationMessageParameters();
 		params.jobPathParameter.resolve(jobID);
 		params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.STOP));
-		CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest(
-			restClusterClientConfiguration.getRestServerAddress(),
-			restClusterClientConfiguration.getRestServerPort(),
+		CompletableFuture<EmptyResponseBody> responseFuture = sendRequest(
 			JobTerminationHeaders.getInstance(),
 			params
 		);
@@ -232,9 +289,7 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 		JobTerminationMessageParameters params = new JobTerminationMessageParameters();
 		params.jobPathParameter.resolve(jobID);
 		params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL));
-		CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest(
-			restClusterClientConfiguration.getRestServerAddress(),
-			restClusterClientConfiguration.getRestServerPort(),
+		CompletableFuture<EmptyResponseBody> responseFuture = sendRequest(
 			JobTerminationHeaders.getInstance(),
 			params
 		);
@@ -257,25 +312,15 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 
 		final CompletableFuture<SavepointTriggerResponseBody> responseFuture;
 
-		try {
-			responseFuture = restClient.sendRequest(
-				restClusterClientConfiguration.getRestServerAddress(),
-				restClusterClientConfiguration.getRestServerPort(),
-				savepointTriggerHeaders,
-				savepointTriggerMessageParameters,
-				new SavepointTriggerRequestBody(savepointDirectory));
-		} catch (IOException e) {
-			throw new FlinkException("Could not send trigger savepoint request to Flink cluster.", e);
-		}
+		responseFuture = sendRequest(
+			savepointTriggerHeaders,
+			savepointTriggerMessageParameters,
+			new SavepointTriggerRequestBody(savepointDirectory));
 
-		return responseFuture.thenApply(savepointTriggerResponseBody -> {
+		return responseFuture.thenCompose(savepointTriggerResponseBody -> {
 			final SavepointTriggerId savepointTriggerId = savepointTriggerResponseBody.getSavepointTriggerId();
-			final SavepointInfo savepointInfo;
-			try {
-				savepointInfo = waitForSavepointCompletion(jobId, savepointTriggerId);
-			} catch (Exception e) {
-				throw new CompletionException(e);
-			}
+			return pollSavepointAsync(jobId, savepointTriggerId);
+		}).thenApply(savepointInfo -> {
 			if (savepointInfo.getFailureCause() != null) {
 				throw new CompletionException(savepointInfo.getFailureCause());
 			}
@@ -283,41 +328,36 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 		});
 	}
 
-	private SavepointInfo waitForSavepointCompletion(
+	private CompletableFuture<SavepointInfo> pollSavepointAsync(
 			final JobID jobId,
-			final SavepointTriggerId savepointTriggerId) throws Exception {
-		return waitForResource(() -> {
+			final SavepointTriggerId savepointTriggerId) {
+		return pollResourceAsync(() -> {
 			final SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance();
 			final SavepointStatusMessageParameters savepointStatusMessageParameters =
 				savepointStatusHeaders.getUnresolvedMessageParameters();
 			savepointStatusMessageParameters.jobIdPathParameter.resolve(jobId);
 			savepointStatusMessageParameters.savepointTriggerIdPathParameter.resolve(savepointTriggerId);
-			return restClient.sendRequest(
-				restClusterClientConfiguration.getRestServerAddress(),
-				restClusterClientConfiguration.getRestServerPort(),
+			return sendRetryableRequest(
 				savepointStatusHeaders,
-				savepointStatusMessageParameters
-			);
+				savepointStatusMessageParameters,
+				EmptyRequestBody.getInstance(),
+				isConnectionProblemException());
 		});
 	}
 
 	@Override
 	public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception {
-		JobsOverviewHeaders headers = JobsOverviewHeaders.getInstance();
-		CompletableFuture<MultipleJobsDetails> jobDetailsFuture = restClient.sendRequest(
-			restClusterClientConfiguration.getRestServerAddress(),
-			restClusterClientConfiguration.getRestServerPort(),
-			headers
-		);
-		return jobDetailsFuture
+		return sendRequest(JobsOverviewHeaders.getInstance())
 			.thenApply(
-				(MultipleJobsDetails multipleJobsDetails) -> {
-					final Collection<JobDetails> jobDetails = multipleJobsDetails.getJobs();
-					Collection<JobStatusMessage> flattenedDetails = new ArrayList<>(jobDetails.size());
-					jobDetails.forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
-
-					return flattenedDetails;
-			});
+				(multipleJobsDetails) -> multipleJobsDetails
+					.getJobs()
+					.stream()
+					.map(detail -> new JobStatusMessage(
+						detail.getJobId(),
+						detail.getJobName(),
+						detail.getStatus(),
+						detail.getStartTime()))
+					.collect(Collectors.toList()));
 	}
 
 	@Override
@@ -325,21 +365,43 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 		return clusterId;
 	}
 
-	private <R, A extends AsynchronouslyCreatedResource<R>> R waitForResource(
-			final SupplierWithException<CompletableFuture<A>, IOException> resourceFutureSupplier)
-				throws IOException, InterruptedException, ExecutionException, TimeoutException {
-		A asynchronouslyCreatedResource;
-		long attempt = 0;
-		while (true) {
-			final CompletableFuture<A> responseFuture = resourceFutureSupplier.get();
-			asynchronouslyCreatedResource = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-			if (asynchronouslyCreatedResource.queueStatus().getId() == QueueStatus.Id.COMPLETED) {
-				break;
+	/**
+	 * Creates a {@code CompletableFuture} that polls a {@code AsynchronouslyCreatedResource} until
+	 * its {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} becomes
+	 * {@link QueueStatus.Id#COMPLETED COMPLETED}. The future completes with the result of
+	 * {@link AsynchronouslyCreatedResource#resource()}.
+	 *
+	 * @param resourceFutureSupplier The operation which polls for the
+	 *                               {@code AsynchronouslyCreatedResource}.
+	 * @param <R>                    The type of the resource.
+	 * @param <A>                    The type of the {@code AsynchronouslyCreatedResource}.
+	 * @return A {@code CompletableFuture} delivering the resource.
+	 */
+	private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R> pollResourceAsync(
+			final Supplier<CompletableFuture<A>> resourceFutureSupplier) {
+		return pollResourceAsync(resourceFutureSupplier, new CompletableFuture<>(), 0);
+	}
+
+	private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R> pollResourceAsync(
+			final Supplier<CompletableFuture<A>> resourceFutureSupplier,
+			final CompletableFuture<R> resultFuture,
+			final long attempt) {
+
+		resourceFutureSupplier.get().whenComplete((asynchronouslyCreatedResource, throwable) -> {
+			if (throwable != null) {
+				resultFuture.completeExceptionally(throwable);
+			} else {
+				if (asynchronouslyCreatedResource.queueStatus().getId() == QueueStatus.Id.COMPLETED) {
+					resultFuture.complete(asynchronouslyCreatedResource.resource());
+				} else {
+					retryExecutorService.schedule(() -> {
+						pollResourceAsync(resourceFutureSupplier, resultFuture, attempt + 1);
+					}, waitStrategy.sleepTime(attempt), TimeUnit.MILLISECONDS);
+				}
 			}
-			Thread.sleep(waitStrategy.sleepTime(attempt));
-			attempt++;
-		}
-		return asynchronouslyCreatedResource.resource();
+		});
+
+		return resultFuture;
 	}
 
 	// ======================================
@@ -358,7 +420,7 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 
 	@Override
 	public String getWebInterfaceURL() {
-		return "http://" + restClusterClientConfiguration.getRestServerAddress() + ':' + restClusterClientConfiguration.getRestServerPort();
+		return getWebMonitorBaseUrl().toString();
 	}
 
 	@Override
@@ -375,4 +437,102 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 	public int getMaxSlots() {
 		return 0;
 	}
+
+	//-------------------------------------------------------------------------
+	// RestClient Helper
+	//-------------------------------------------------------------------------
+
+	private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P>
+			sendRequest(M messageHeaders, U messageParameters) {
+		return sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance());
+	}
+
+	private <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
+			sendRequest(M messageHeaders, R request) {
+		return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request);
+	}
+
+	private <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P>
+			sendRequest(M messageHeaders) {
+		return sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
+	}
+
+	private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
+			sendRequest(M messageHeaders, U messageParameters, R request) {
+		return getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> {
+			try {
+				return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request);
+			} catch (IOException e) {
+				throw new CompletionException(e);
+			}
+		});
+	}
+
+	private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
+			sendRetryableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable> retryPredicate) {
+		return retry(() -> getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> {
+			try {
+				return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request);
+			} catch (IOException e) {
+				throw new CompletionException(e);
+			}
+		}), retryPredicate);
+	}
+
+	private <C> CompletableFuture<C> retry(
+			CheckedSupplier<CompletableFuture<C>> operation,
+			Predicate<Throwable> retryPredicate) {
+		return FutureUtils.retryWithDelay(
+			CheckedSupplier.unchecked(operation),
+			restClusterClientConfiguration.getRetryMaxAttempts(),
+			Time.milliseconds(restClusterClientConfiguration.getRetryDelay()),
+			retryPredicate,
+			new ScheduledExecutorServiceAdapter(retryExecutorService));
+	}
+
+	private static Predicate<Throwable> isConnectionProblemException() {
+		return (throwable) ->
+			ExceptionUtils.findThrowable(throwable, java.net.ConnectException.class).isPresent() ||
+				ExceptionUtils.findThrowable(throwable, java.net.SocketTimeoutException.class).isPresent() ||
+				ExceptionUtils.findThrowable(throwable, ConnectTimeoutException.class).isPresent() ||
+				ExceptionUtils.findThrowable(throwable, IOException.class).isPresent();
+	}
+
+	private static Predicate<Throwable> isHttpStatusUnsuccessfulException() {
+		return (throwable) -> ExceptionUtils.findThrowable(throwable, RestClientException.class)
+				.map(restClientException -> {
+					final int code = restClientException.getHttpResponseStatus().code();
+					return code < 200 || code > 299;
+				})
+				.orElse(false);
+	}
+
+	private CompletableFuture<URL> getWebMonitorBaseUrl() {
+		return FutureUtils.orTimeout(
+				webMonitorLeaderRetriever.getLeaderFuture(),
+				restClusterClientConfiguration.getAwaitLeaderTimeout(),
+				TimeUnit.MILLISECONDS)
+			.thenApplyAsync(leaderAddressSessionId -> {
+				final String url = leaderAddressSessionId.f0;
+				try {
+					return new URL(url);
+				} catch (MalformedURLException e) {
+					throw new IllegalArgumentException("Could not parse URL from " + url, e);
+				}
+			}, executorService);
+	}
+
+	private CompletableFuture<String> getDispatcherAddress() {
+		return FutureUtils.orTimeout(
+				dispatcherLeaderRetriever.getLeaderFuture(),
+				restClusterClientConfiguration.getAwaitLeaderTimeout(),
+				TimeUnit.MILLISECONDS)
+			.thenApplyAsync(leaderAddressSessionId -> {
+				final String address = leaderAddressSessionId.f0;
+				final Option<String> host = AddressFromURIString.parse(address).host();
+				checkArgument(host.isDefined(), "Could not parse host from %s", address);
+				return host.get();
+			}, executorService);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
index 788eba9..2d58dce 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
@@ -19,60 +19,73 @@
 package org.apache.flink.client.program.rest;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A configuration object for {@link RestClusterClient}s.
  */
 public final class RestClusterClientConfiguration {
 
-	private final String blobServerAddress;
-
 	private final RestClientConfiguration restClientConfiguration;
 
-	private final String restServerAddress;
+	private final long awaitLeaderTimeout;
+
+	private final int retryMaxAttempts;
 
-	private final int restServerPort;
+	private final long retryDelay;
 
 	private RestClusterClientConfiguration(
-			String blobServerAddress,
-			RestClientConfiguration endpointConfiguration,
-			String restServerAddress,
-			int restServerPort) {
-		this.blobServerAddress = Preconditions.checkNotNull(blobServerAddress);
+			final RestClientConfiguration endpointConfiguration,
+			final long awaitLeaderTimeout,
+			final int retryMaxAttempts,
+			final long retryDelay) {
+		checkArgument(awaitLeaderTimeout >= 0, "awaitLeaderTimeout must be equal to or greater than 0");
+		checkArgument(retryMaxAttempts >= 0, "retryMaxAttempts must be equal to or greater than 0");
+		checkArgument(retryDelay >= 0, "retryDelay must be equal to or greater than 0");
+
 		this.restClientConfiguration = Preconditions.checkNotNull(endpointConfiguration);
-		this.restServerAddress = Preconditions.checkNotNull(restServerAddress);
-		this.restServerPort = restServerPort;
+		this.awaitLeaderTimeout = awaitLeaderTimeout;
+		this.retryMaxAttempts = retryMaxAttempts;
+		this.retryDelay = retryDelay;
 	}
 
-	public String getBlobServerAddress() {
-		return blobServerAddress;
+	public RestClientConfiguration getRestClientConfiguration() {
+		return restClientConfiguration;
 	}
 
-	public String getRestServerAddress() {
-		return restServerAddress;
+	/**
+	 * @see RestOptions#AWAIT_LEADER_TIMEOUT
+	 */
+	public long getAwaitLeaderTimeout() {
+		return awaitLeaderTimeout;
 	}
 
-	public int getRestServerPort() {
-		return restServerPort;
+	/**
+	 * @see RestOptions#RETRY_MAX_ATTEMPTS
+	 */
+	public int getRetryMaxAttempts() {
+		return retryMaxAttempts;
 	}
 
-	public RestClientConfiguration getRestClientConfiguration() {
-		return restClientConfiguration;
+	/**
+	 * @see RestOptions#RETRY_DELAY
+	 */
+	public long getRetryDelay() {
+		return retryDelay;
 	}
 
 	public static RestClusterClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
-		String blobServerAddress = config.getString(JobManagerOptions.ADDRESS);
-
-		String serverAddress = config.getString(RestOptions.REST_ADDRESS);
-		int serverPort = config.getInteger(RestOptions.REST_PORT);
-
 		RestClientConfiguration restClientConfiguration = RestClientConfiguration.fromConfiguration(config);
 
-		return new RestClusterClientConfiguration(blobServerAddress, restClientConfiguration, serverAddress, serverPort);
+		final long awaitLeaderTimeout = config.getLong(RestOptions.AWAIT_LEADER_TIMEOUT);
+		final int retryMaxAttempts = config.getInteger(RestOptions.RETRY_MAX_ATTEMPTS);
+		final long retryDelay = config.getLong(RestOptions.RETRY_DELAY);
+
+		return new RestClusterClientConfiguration(restClientConfiguration, awaitLeaderTimeout, retryMaxAttempts, retryDelay);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientConfigurationTest.java
new file mode 100644
index 0000000..30cf926
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientConfigurationTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link RestClusterClientConfiguration}.
+ */
+public class RestClusterClientConfigurationTest extends TestLogger {
+
+	private RestClusterClientConfiguration restClusterClientConfiguration;
+
+	@Before
+	public void setUp() throws Exception {
+		final Configuration config = new Configuration();
+		config.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 1);
+		config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 2);
+		config.setLong(RestOptions.RETRY_DELAY, 3);
+		restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(config);
+	}
+
+	@Test
+	public void testConfiguration() {
+		assertEquals(1, restClusterClientConfiguration.getAwaitLeaderTimeout());
+		assertEquals(2, restClusterClientConfiguration.getRetryMaxAttempts());
+		assertEquals(3, restClusterClientConfiguration.getRetryDelay());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index f7cda03..c880817 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.program.rest;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -25,6 +26,7 @@ import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
@@ -34,6 +36,8 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -70,9 +74,12 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMes
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
 import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -90,6 +97,7 @@ import org.mockito.MockitoAnnotations;
 
 import javax.annotation.Nonnull;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -99,12 +107,17 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.when;
@@ -130,6 +143,10 @@ public class RestClusterClientTest extends TestLogger {
 
 	private RestClusterClient<StandaloneClusterId> restClusterClient;
 
+	private volatile FailHttpRequestPredicate failHttpRequest = FailHttpRequestPredicate.never();
+
+	private ExecutorService executor;
+
 	@Before
 	public void setUp() throws Exception {
 		MockitoAnnotations.initMocks(this);
@@ -137,9 +154,30 @@ public class RestClusterClientTest extends TestLogger {
 
 		final Configuration config = new Configuration();
 		config.setString(JobManagerOptions.ADDRESS, "localhost");
+		config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10);
+		config.setLong(RestOptions.RETRY_DELAY, 0);
+
 		restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(config);
 		mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway);
-		restClusterClient = new RestClusterClient(config, StandaloneClusterId.getInstance(), (attempt) -> 0);
+
+		executor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(RestClusterClientTest.class.getSimpleName()));
+		final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), executor) {
+			@Override
+			public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
+			sendRequest(
+					final String targetAddress,
+					final int targetPort,
+					final M messageHeaders,
+					final U messageParameters,
+					final R request) throws IOException {
+				if (failHttpRequest.test(messageHeaders, messageParameters, request)) {
+					return FutureUtils.completedExceptionally(new IOException("expected"));
+				} else {
+					return super.sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request);
+				}
+			}
+		};
+		restClusterClient = new RestClusterClient<>(config, restClient, StandaloneClusterId.getInstance(), (attempt) -> 0);
 	}
 
 	@After
@@ -147,6 +185,10 @@ public class RestClusterClientTest extends TestLogger {
 		if (restClusterClient != null) {
 			restClusterClient.shutdown();
 		}
+
+		if (executor != null) {
+			executor.shutdown();
+		}
 	}
 
 	@Test
@@ -158,11 +200,11 @@ public class RestClusterClientTest extends TestLogger {
 		TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
 		TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler();
 		TestJobExecutionResultHandler testJobExecutionResultHandler =
-			new TestJobExecutionResultHandler(Collections.singletonList(
+			new TestJobExecutionResultHandler(
 				JobExecutionResultResponseBody.created(new JobResult.Builder()
 					.jobId(id)
 					.netRuntime(Long.MAX_VALUE)
-					.build())).iterator());
+					.build()));
 
 		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
 			portHandler,
@@ -239,14 +281,17 @@ public class RestClusterClientTest extends TestLogger {
 	private class TestJobExecutionResultHandler
 		extends TestHandler<EmptyRequestBody, JobExecutionResultResponseBody, JobMessageParameters> {
 
-		private final Iterator<JobExecutionResultResponseBody> jobExecutionResults;
+		private final Iterator<Object> jobExecutionResults;
 
-		private JobExecutionResultResponseBody lastJobExecutionResult;
+		private Object lastJobExecutionResult;
 
 		private TestJobExecutionResultHandler(
-				final Iterator<JobExecutionResultResponseBody> jobExecutionResults) {
+				final Object... jobExecutionResults) {
 			super(JobExecutionResultHeaders.getInstance());
-			this.jobExecutionResults = jobExecutionResults;
+			checkArgument(Arrays.stream(jobExecutionResults)
+				.allMatch(object -> object instanceof JobExecutionResultResponseBody
+					|| object instanceof RestHandlerException));
+			this.jobExecutionResults = Arrays.asList(jobExecutionResults).iterator();
 		}
 
 		@Override
@@ -257,7 +302,13 @@ public class RestClusterClientTest extends TestLogger {
 				lastJobExecutionResult = jobExecutionResults.next();
 			}
 			checkState(lastJobExecutionResult != null);
-			return CompletableFuture.completedFuture(lastJobExecutionResult);
+			if (lastJobExecutionResult instanceof JobExecutionResultResponseBody) {
+				return CompletableFuture.completedFuture((JobExecutionResultResponseBody) lastJobExecutionResult);
+			} else if (lastJobExecutionResult instanceof RestHandlerException) {
+				return FutureUtils.completedExceptionally((RestHandlerException) lastJobExecutionResult);
+			} else {
+				throw new AssertionError();
+			}
 		}
 	}
 
@@ -267,7 +318,8 @@ public class RestClusterClientTest extends TestLogger {
 		final JobID jobId = jobGraph.getJobID();
 
 		final TestJobExecutionResultHandler testJobExecutionResultHandler =
-			new TestJobExecutionResultHandler(Arrays.asList(
+			new TestJobExecutionResultHandler(
+				new RestHandlerException("should trigger retry", HttpResponseStatus.NOT_FOUND),
 				JobExecutionResultResponseBody.inProgress(),
 				JobExecutionResultResponseBody.created(new JobResult.Builder()
 					.jobId(jobId)
@@ -278,17 +330,23 @@ public class RestClusterClientTest extends TestLogger {
 					.jobId(jobId)
 					.netRuntime(Long.MAX_VALUE)
 					.serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
-					.build())).iterator());
+					.build()));
+
+		// fail first HTTP polling attempt, which should not be a problem because of the retries
+		final AtomicBoolean firstPollFailed = new AtomicBoolean();
+		failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
+			messageHeaders instanceof JobExecutionResultHeaders && !firstPollFailed.getAndSet(true);
 
 		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
 			testJobExecutionResultHandler,
 			new TestBlobServerPortHandler(),
 			new TestJobSubmitHandler())) {
 
-			final org.apache.flink.api.common.JobExecutionResult jobExecutionResult =
-				(org.apache.flink.api.common.JobExecutionResult) restClusterClient.submitJob(
-					jobGraph,
-					ClassLoader.getSystemClassLoader());
+			JobExecutionResult jobExecutionResult;
+
+			jobExecutionResult = (JobExecutionResult) restClusterClient.submitJob(
+				jobGraph,
+				ClassLoader.getSystemClassLoader());
 			assertThat(jobExecutionResult.getJobID(), equalTo(jobId));
 			assertThat(jobExecutionResult.getNetRuntime(), equalTo(Long.MAX_VALUE));
 			assertThat(
@@ -314,9 +372,9 @@ public class RestClusterClientTest extends TestLogger {
 		final TestSavepointHandlers testSavepointHandlers = new TestSavepointHandlers();
 		final TestSavepointHandlers.TestSavepointTriggerHandler triggerHandler =
 			testSavepointHandlers.new TestSavepointTriggerHandler(
-				Arrays.asList(null, targetSavepointDirectory, null).iterator());
+				null, targetSavepointDirectory, null);
 		final TestSavepointHandlers.TestSavepointHandler savepointHandler =
-			testSavepointHandlers.new TestSavepointHandler(Arrays.asList(
+			testSavepointHandlers.new TestSavepointHandler(
 				new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
 					testSavepointHandlers.testSavepointTriggerId,
 					savepointLocationDefaultDir,
@@ -328,7 +386,14 @@ public class RestClusterClientTest extends TestLogger {
 				new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
 					testSavepointHandlers.testSavepointTriggerId,
 					null,
-					new SerializedThrowable(new RuntimeException("expected"))))).iterator());
+					new SerializedThrowable(new RuntimeException("expected")))),
+				new RestHandlerException("not found", HttpResponseStatus.NOT_FOUND));
+
+		// fail first HTTP polling attempt, which should not be a problem because of the retries
+		final AtomicBoolean firstPollFailed = new AtomicBoolean();
+		failHttpRequest = (messageHeaders, messageParameters, requestBody) ->
+			messageHeaders instanceof SavepointStatusHeaders && !firstPollFailed.getAndSet(true);
+
 		try (TestRestServerEndpoint ignored = createRestServerEndpoint(
 			triggerHandler,
 			savepointHandler)) {
@@ -358,6 +423,14 @@ public class RestClusterClientTest extends TestLogger {
 						.getMessage(), equalTo("expected"));
 				}
 			}
+
+			try {
+				restClusterClient.triggerSavepoint(new JobID(), null).get();
+			} catch (final ExecutionException e) {
+				assertTrue(
+					"RestClientException not in causal chain",
+					ExceptionUtils.findThrowable(e, RestClientException.class).isPresent());
+			}
 		}
 	}
 
@@ -369,9 +442,9 @@ public class RestClusterClientTest extends TestLogger {
 
 			private final Iterator<String> expectedTargetDirectories;
 
-			TestSavepointTriggerHandler(final Iterator<String> expectedTargetDirectories) {
+			TestSavepointTriggerHandler(final String... expectedTargetDirectories) {
 				super(SavepointTriggerHeaders.getInstance());
-				this.expectedTargetDirectories = expectedTargetDirectories;
+				this.expectedTargetDirectories = Arrays.asList(expectedTargetDirectories).iterator();
 			}
 
 			@Override
@@ -393,11 +466,14 @@ public class RestClusterClientTest extends TestLogger {
 		private class TestSavepointHandler
 				extends TestHandler<EmptyRequestBody, SavepointResponseBody, SavepointStatusMessageParameters> {
 
-			private final Iterator<SavepointResponseBody> expectedSavepointResponseBodies;
+			private final Iterator<Object> expectedSavepointResponseBodies;
 
-			TestSavepointHandler(final Iterator<SavepointResponseBody> expectedSavepointResponseBodies) {
+			TestSavepointHandler(final Object... expectedSavepointResponseBodies) {
 				super(SavepointStatusHeaders.getInstance());
-				this.expectedSavepointResponseBodies = expectedSavepointResponseBodies;
+				checkArgument(Arrays.stream(expectedSavepointResponseBodies)
+					.allMatch(response -> response instanceof SavepointResponseBody ||
+						response instanceof RestHandlerException));
+				this.expectedSavepointResponseBodies = Arrays.asList(expectedSavepointResponseBodies).iterator();
 			}
 
 			@Override
@@ -406,7 +482,14 @@ public class RestClusterClientTest extends TestLogger {
 					@Nonnull DispatcherGateway gateway) throws RestHandlerException {
 				final SavepointTriggerId savepointTriggerId = request.getPathParameter(SavepointTriggerIdPathParameter.class);
 				if (testSavepointTriggerId.equals(savepointTriggerId)) {
-					return CompletableFuture.completedFuture(expectedSavepointResponseBodies.next());
+					final Object response = expectedSavepointResponseBodies.next();
+					if (response instanceof SavepointResponseBody) {
+						return CompletableFuture.completedFuture((SavepointResponseBody) response);
+					} else if (response instanceof RestHandlerException) {
+						return FutureUtils.completedExceptionally((RestHandlerException) response);
+					} else {
+						throw new AssertionError();
+					}
 				} else {
 					return FutureUtils.completedExceptionally(
 						new RestHandlerException(
@@ -490,4 +573,15 @@ public class RestClusterClientTest extends TestLogger {
 			shutdown(Time.seconds(5));
 		}
 	}
+
+	@FunctionalInterface
+	private interface FailHttpRequestPredicate {
+
+		boolean test(MessageHeaders<?, ?, ?> messageHeaders, MessageParameters messageParameters, RequestBody requestBody);
+
+		static FailHttpRequestPredicate never() {
+			return ((messageHeaders, messageParameters, requestBody) -> false);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 906e266..16fd40d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -33,12 +33,52 @@ public class RestOptions {
 	 */
 	public static final ConfigOption<String> REST_ADDRESS =
 		key("rest.address")
-			.defaultValue("localhost");
+			.defaultValue("localhost")
+			.withDescription("The address that the server binds itself to / the client connects to.");
 
 	/**
 	 * The port that the server listens on / the client connects to.
 	 */
 	public static final ConfigOption<Integer> REST_PORT =
 		key("rest.port")
-			.defaultValue(9067);
+			.defaultValue(9067)
+			.withDescription("The port that the server listens on / the client connects to.");
+
+	/**
+	 * The time in ms that the client waits for the leader address, e.g., Dispatcher or
+	 * WebMonitorEndpoint.
+	 */
+	public static final ConfigOption<Long> AWAIT_LEADER_TIMEOUT =
+		key("rest.await-leader-timeout")
+			.defaultValue(30_000L)
+			.withDescription("The time in ms that the client waits for the leader address, e.g., " +
+				"Dispatcher or WebMonitorEndpoint");
+
+	/**
+	 * The number of retries the client will attempt if a retryable operations fails.
+	 * @see #RETRY_DELAY
+	 */
+	public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS =
+		key("rest.retry.max-attempts")
+			.defaultValue(20)
+			.withDescription("The number of retries the client will attempt if a retryable " +
+				"operations fails.");
+
+	/**
+	 * The time in ms that the client waits between retries.
+	 * @see #RETRY_MAX_ATTEMPTS
+	 */
+	public static final ConfigOption<Long> RETRY_DELAY =
+		key("rest.retry.delay")
+			.defaultValue(3_000L)
+			.withDescription(String.format("The time in ms that the client waits between retries " +
+				"(See also `%s`).", RETRY_MAX_ATTEMPTS.key()));
+
+	/**
+	 * The maximum time in ms for the client to establish a TCP connection.
+	 */
+	public static final ConfigOption<Long> CONNECTION_TIMEOUT =
+		key("rest.connection-timeout")
+			.defaultValue(15_000L)
+			.withDescription("The maximum time in ms for the client to establish a TCP connection.");
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 5c69564..ca55c5b 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -286,7 +286,7 @@ public final class ExceptionUtils {
 	 * @param searchType the type of exception to search for in the chain.
 	 * @return Optional throwable of the requested type if available, otherwise empty
 	 */
-	public static Optional<Throwable> findThrowable(Throwable throwable, Class<?> searchType) {
+	public static <T extends Throwable> Optional<T> findThrowable(Throwable throwable, Class<T> searchType) {
 		if (throwable == null || searchType == null) {
 			return Optional.empty();
 		}
@@ -294,7 +294,7 @@ public final class ExceptionUtils {
 		Throwable t = throwable;
 		while (t != null) {
 			if (searchType.isAssignableFrom(t.getClass())) {
-				return Optional.of(t);
+				return Optional.of(searchType.cast(t));
 			} else {
 				t = t.getCause();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-core/src/main/java/org/apache/flink/util/function/CheckedSupplier.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/CheckedSupplier.java b/flink-core/src/main/java/org/apache/flink/util/function/CheckedSupplier.java
new file mode 100644
index 0000000..a0bcc13
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/function/CheckedSupplier.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import java.util.function.Supplier;
+
+/**
+ * Similar to {@link java.util.function.Supplier} but can throw {@link Exception}.
+ */
+@FunctionalInterface
+public interface CheckedSupplier<R> extends SupplierWithException<R, Exception> {
+
+	static <R> Supplier<R> unchecked(CheckedSupplier<R> checkedSupplier) {
+		return () -> {
+			try {
+				return checkedSupplier.get();
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		};
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
index 20a46ad..07978a5 100644
--- a/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
@@ -61,4 +61,12 @@ public class ExceptionUtilsTest extends TestLogger {
 		// non-fatal error is not rethrown
 		ExceptionUtils.rethrowIfFatalError(new NoClassDefFoundError());
 	}
+
+	@Test
+	public void testFindThrowableByType() {
+		assertTrue(ExceptionUtils.findThrowable(
+			new RuntimeException(new IllegalStateException()),
+			IllegalStateException.class).isPresent());
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 964b63b..b5d327f 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
@@ -33,6 +35,7 @@ import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.ConfigurationException;
@@ -54,6 +57,7 @@ import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
@@ -285,13 +289,45 @@ public class RestAPIDocGenerator {
 		}
 
 		private DocumentingDispatcherRestEndpoint() {
-			super(restConfig, dispatcherGatewayRetriever, config, handlerConfig, resourceManagerGatewayRetriever, executor, metricQueryServiceRetriever);
+			super(restConfig, dispatcherGatewayRetriever, config, handlerConfig, resourceManagerGatewayRetriever, executor, metricQueryServiceRetriever, NoOpElectionService.INSTANCE, NoOpFatalErrorHandler.INSTANCE);
 		}
 
 		@Override
 		public List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
 			return super.initializeHandlers(restAddressFuture);
 		}
+
+		private enum NoOpElectionService implements LeaderElectionService {
+			INSTANCE;
+			@Override
+			public void start(final LeaderContender contender) throws Exception {
+
+			}
+
+			@Override
+			public void stop() throws Exception {
+
+			}
+
+			@Override
+			public void confirmLeaderSessionID(final UUID leaderSessionID) {
+
+			}
+
+			@Override
+			public boolean hasLeadership() {
+				return false;
+			}
+		}
+
+		private enum NoOpFatalErrorHandler implements FatalErrorHandler {
+			INSTANCE;
+
+			@Override
+			public void onFatalError(final Throwable exception) {
+
+			}
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index fd6b2ca..7109171 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -119,7 +119,7 @@ public class RuntimeMonitorHandler extends RedirectHandler<JobManagerGateway> im
 				if (throwable != null) {
 					LOG.debug("Error while handling request.", throwable);
 
-					Optional<Throwable> optNotFound = ExceptionUtils.findThrowable(throwable, NotFoundException.class);
+					Optional<NotFoundException> optNotFound = ExceptionUtils.findThrowable(throwable, NotFoundException.class);
 
 					if (optNotFound.isPresent()) {
 						// this should result in a 404 error code (not found)

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 7195957..17381a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import scala.concurrent.Future;
@@ -127,6 +128,7 @@ public class FutureUtils {
 	 * @param operation to retry
 	 * @param retries number of retries
 	 * @param retryDelay delay between retries
+	 * @param retryPredicate Predicate to test whether an exception is retryable
 	 * @param scheduledExecutor executor to be used for the retry operation
 	 * @param <T> type of the result
 	 * @return Future which retries the given operation a given amount of times and delays the retry in case of failures
@@ -135,6 +137,7 @@ public class FutureUtils {
 			final Supplier<CompletableFuture<T>> operation,
 			final int retries,
 			final Time retryDelay,
+			final Predicate<Throwable> retryPredicate,
 			final ScheduledExecutor scheduledExecutor) {
 
 		final CompletableFuture<T> resultFuture = new CompletableFuture<>();
@@ -144,16 +147,41 @@ public class FutureUtils {
 			operation,
 			retries,
 			retryDelay,
+			retryPredicate,
 			scheduledExecutor);
 
 		return resultFuture;
 	}
 
+	/**
+	 * Retry the given operation with the given delay in between failures.
+	 *
+	 * @param operation to retry
+	 * @param retries number of retries
+	 * @param retryDelay delay between retries
+	 * @param scheduledExecutor executor to be used for the retry operation
+	 * @param <T> type of the result
+	 * @return Future which retries the given operation a given amount of times and delays the retry in case of failures
+	 */
+	public static <T> CompletableFuture<T> retryWithDelay(
+			final Supplier<CompletableFuture<T>> operation,
+			final int retries,
+			final Time retryDelay,
+			final ScheduledExecutor scheduledExecutor) {
+		return retryWithDelay(
+			operation,
+			retries,
+			retryDelay,
+			(throwable) -> true,
+			scheduledExecutor);
+	}
+
 	private static <T> void retryOperationWithDelay(
 			final CompletableFuture<T> resultFuture,
 			final Supplier<CompletableFuture<T>> operation,
 			final int retries,
 			final Time retryDelay,
+			final Predicate<Throwable> retryPredicate,
 			final ScheduledExecutor scheduledExecutor) {
 
 		if (!resultFuture.isDone()) {
@@ -165,17 +193,21 @@ public class FutureUtils {
 						if (throwable instanceof CancellationException) {
 							resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
 						} else {
-							if (retries > 0) {
+							if (retries > 0 && retryPredicate.test(throwable)) {
 								final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
-									() -> retryOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor),
+									() -> retryOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, retryPredicate, scheduledExecutor),
 									retryDelay.toMilliseconds(),
 									TimeUnit.MILLISECONDS);
 
 								resultFuture.whenComplete(
 									(innerT, innerThrowable) -> scheduledFuture.cancel(false));
 							} else {
-								resultFuture.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
-									"has been exhausted.", throwable));
+								final String errorMsg = retries == 0 ?
+									"Number of retries has been exhausted." :
+									"Exception is not retryable.";
+								resultFuture.completeExceptionally(new RetryException(
+									"Could not complete the operation. " + errorMsg,
+									throwable));
 							}
 						}
 					} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 2ab97e2..0ac64b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -46,13 +48,15 @@ import java.util.concurrent.Executor;
 public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway> {
 
 	public DispatcherRestEndpoint(
-			RestServerEndpointConfiguration endpointConfiguration,
-			GatewayRetriever<DispatcherGateway> leaderRetriever,
-			Configuration clusterConfiguration,
-			RestHandlerConfiguration restConfiguration,
-			GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
-			Executor executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever) {
+		RestServerEndpointConfiguration endpointConfiguration,
+		GatewayRetriever<DispatcherGateway> leaderRetriever,
+		Configuration clusterConfiguration,
+		RestHandlerConfiguration restConfiguration,
+		GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
+		Executor executor,
+		MetricQueryServiceRetriever metricQueryServiceRetriever,
+		LeaderElectionService leaderElectionService,
+		FatalErrorHandler fatalErrorHandler) {
 		super(
 			endpointConfiguration,
 			leaderRetriever,
@@ -60,7 +64,9 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 			restConfiguration,
 			resourceManagerRetriever,
 			executor,
-			metricQueryServiceRetriever);
+			metricQueryServiceRetriever,
+			leaderElectionService,
+			fatalErrorHandler);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index b90253a..e52f113 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
@@ -119,7 +120,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			jobMasterGatewayRetriever,
 			resourceManagerGatewayRetriever,
 			rpcService.getExecutor(),
-			new AkkaQueryServiceRetriever(actorSystem, timeout));
+			new AkkaQueryServiceRetriever(actorSystem, timeout),
+			highAvailabilityServices.getWebMonitorLeaderElectionService());
 
 		LOG.debug("Starting JobMaster REST endpoint.");
 		jobMasterRestEndpoint.start();
@@ -163,7 +165,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			GatewayRetriever<JobMasterGateway> jobMasterGatewayRetriever,
 			GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
 			Executor executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever) throws ConfigurationException {
+			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			LeaderElectionService leaderElectionService) throws ConfigurationException {
 
 		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
 
@@ -174,7 +177,9 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			restHandlerConfiguration,
 			resourceManagerGatewayRetriever,
 			executor,
-			metricQueryServiceRetriever);
+			metricQueryServiceRetriever,
+			leaderElectionService,
+			this);
 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 0b1cea0..fe9bd92 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
@@ -121,7 +122,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			dispatcherGatewayRetriever,
 			resourceManagerGatewayRetriever,
 			rpcService.getExecutor(),
-			new AkkaQueryServiceRetriever(actorSystem, timeout));
+			new AkkaQueryServiceRetriever(actorSystem, timeout),
+			highAvailabilityServices.getWebMonitorLeaderElectionService());
 
 		LOG.debug("Starting Dispatcher REST endpoint.");
 		dispatcherRestEndpoint.start();
@@ -227,11 +229,12 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 	}
 
 	protected DispatcherRestEndpoint createDispatcherRestEndpoint(
-			Configuration configuration,
-			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
-			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-			Executor executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever) throws Exception {
+		Configuration configuration,
+		LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
+		LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+		Executor executor,
+		MetricQueryServiceRetriever metricQueryServiceRetriever,
+		LeaderElectionService leaderElectionService) throws Exception {
 
 		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
 
@@ -242,7 +245,9 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			restHandlerConfiguration,
 			resourceManagerGatewayRetriever,
 			executor,
-			metricQueryServiceRetriever);
+			metricQueryServiceRetriever,
+			leaderElectionService,
+			this);
 	}
 
 	protected Dispatcher createDispatcher(

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index defe5cc..e65e952 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -98,6 +98,8 @@ public interface HighAvailabilityServices extends AutoCloseable {
 	 */
 	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);
 
+	LeaderRetrievalService getWebMonitorLeaderRetriever();
+
 	/**
 	 * Gets the leader election service for the cluster's resource manager.
 	 *
@@ -120,6 +122,8 @@ public interface HighAvailabilityServices extends AutoCloseable {
 	 */
 	LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
 
+	LeaderElectionService getWebMonitorLeaderElectionService();
+
 	/**
 	 * Gets the checkpoint recovery factory for the job manager
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 7a89ed8..4f12f2b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.highavailability;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
@@ -95,10 +97,16 @@ public class HighAvailabilityServicesUtils {
 					addressResolution,
 					configuration);
 
+				final String address = configuration.getString(RestOptions.REST_ADDRESS);
+				final int port = configuration.getInteger(RestOptions.REST_PORT);
+				final boolean enableSSL = configuration.getBoolean(SecurityOptions.SSL_ENABLED);
+				final String protocol = enableSSL ? "https://" : "http://";
+
 				return new StandaloneHaServices(
 					resourceManagerRpcUrl,
 					dispatcherRpcUrl,
-					jobManagerRpcUrl);
+					jobManagerRpcUrl,
+					String.format("%s%s:%s", protocol, address, port));
 			case ZOOKEEPER:
 				BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
index 4c30f87..7b2c69b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.concurrent.GuardedBy;
+
 import java.util.HashMap;
 import java.util.concurrent.Executor;
 
@@ -49,11 +50,14 @@ public class EmbeddedHaServices extends AbstractNonHaServices {
 
 	private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
 
+	private final EmbeddedLeaderService webMonitorLeaderService;
+
 	public EmbeddedHaServices(Executor executor) {
 		this.executor = Preconditions.checkNotNull(executor);
 		this.resourceManagerLeaderService = new EmbeddedLeaderService(executor);
 		this.dispatcherLeaderService = new EmbeddedLeaderService(executor);
 		this.jobManagerLeaderServices = new HashMap<>();
+		this.webMonitorLeaderService = new EmbeddedLeaderService(executor);
 	}
 
 	// ------------------------------------------------------------------------
@@ -97,6 +101,11 @@ public class EmbeddedHaServices extends AbstractNonHaServices {
 	}
 
 	@Override
+	public LeaderRetrievalService getWebMonitorLeaderRetriever() {
+		return webMonitorLeaderService.createLeaderRetrievalService();
+	}
+
+	@Override
 	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
 		checkNotNull(jobID);
 
@@ -107,6 +116,11 @@ public class EmbeddedHaServices extends AbstractNonHaServices {
 		}
 	}
 
+	@Override
+	public LeaderElectionService getWebMonitorLeaderElectionService() {
+		return webMonitorLeaderService.createLeaderElectionService();
+	}
+
 	// ------------------------------------------------------------------------
 	// internal
 	// ------------------------------------------------------------------------
@@ -136,6 +150,8 @@ public class EmbeddedHaServices extends AbstractNonHaServices {
 				jobManagerLeaderServices.clear();
 
 				resourceManagerLeaderService.shutdown();
+
+				webMonitorLeaderService.shutdown();
 			}
 
 			super.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
index 617b351..cbfcd49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
@@ -51,18 +51,23 @@ public class StandaloneHaServices extends AbstractNonHaServices {
 	/** The fix address of the JobManager */
 	private final String jobManagerAddress;
 
+	private final String webMonitorAddress;
+
 	/**
 	 * Creates a new services class for the fix pre-defined leaders.
-	 * 
+	 *
 	 * @param resourceManagerAddress    The fix address of the ResourceManager
+	 * @param webMonitorAddress
 	 */
 	public StandaloneHaServices(
-		String resourceManagerAddress,
-		String dispatcherAddress,
-		String jobManagerAddress) {
+			String resourceManagerAddress,
+			String dispatcherAddress,
+			String jobManagerAddress,
+			String webMonitorAddress) {
 		this.resourceManagerAddress = checkNotNull(resourceManagerAddress, "resourceManagerAddress");
 		this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress");
 		this.jobManagerAddress = checkNotNull(jobManagerAddress, "jobManagerAddress");
+		this.webMonitorAddress = checkNotNull(webMonitorAddress, webMonitorAddress);
 	}
 
 	// ------------------------------------------------------------------------
@@ -132,4 +137,23 @@ public class StandaloneHaServices extends AbstractNonHaServices {
 			return new StandaloneLeaderElectionService();
 		}
 	}
+
+	@Override
+	public LeaderRetrievalService getWebMonitorLeaderRetriever() {
+		synchronized (lock) {
+			checkNotShutdown();
+
+			return new StandaloneLeaderRetrievalService(webMonitorAddress, DEFAULT_LEADER_ID);
+		}
+	}
+
+	@Override
+	public LeaderElectionService getWebMonitorLeaderElectionService() {
+		synchronized (lock) {
+			checkNotShutdown();
+
+			return new StandaloneLeaderElectionService();
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 04ab6d3..6d5c721 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.highavailability.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -35,6 +33,8 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.apache.curator.framework.CuratorFramework;
+
 import java.io.IOException;
 import java.util.concurrent.Executor;
 
@@ -86,6 +86,8 @@ public class ZooKeeperHaServices implements HighAvailabilityServices {
 
 	private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
 
+	private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";
+
 	// ------------------------------------------------------------------------
 	
 	
@@ -142,6 +144,11 @@ public class ZooKeeperHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
+	public LeaderRetrievalService getWebMonitorLeaderRetriever() {
+		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, REST_SERVER_LEADER_PATH);
+	}
+
+	@Override
 	public LeaderElectionService getResourceManagerLeaderElectionService() {
 		return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
 	}
@@ -157,6 +164,11 @@ public class ZooKeeperHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
+	public LeaderElectionService getWebMonitorLeaderElectionService() {
+		return ZooKeeperUtils.createLeaderElectionService(client, configuration, REST_SERVER_LEADER_PATH);
+	}
+
+	@Override
 	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
 		return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java
index 4baac95..1ed191f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRestEndpoint.java
@@ -19,9 +19,11 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
@@ -34,13 +36,15 @@ import java.util.concurrent.Executor;
 public class JobMasterRestEndpoint extends WebMonitorEndpoint<JobMasterGateway> {
 
 	public JobMasterRestEndpoint(
-			RestServerEndpointConfiguration endpointConfiguration,
-			GatewayRetriever<JobMasterGateway> leaderRetriever,
-			Configuration clusterConfiguration,
-			RestHandlerConfiguration restConfiguration,
-			GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
-			Executor executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever) {
-		super(endpointConfiguration, leaderRetriever, clusterConfiguration, restConfiguration, resourceManagerRetriever, executor, metricQueryServiceRetriever);
+		RestServerEndpointConfiguration endpointConfiguration,
+		GatewayRetriever<JobMasterGateway> leaderRetriever,
+		Configuration clusterConfiguration,
+		RestHandlerConfiguration restConfiguration,
+		GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
+		Executor executor,
+		MetricQueryServiceRetriever metricQueryServiceRetriever,
+		LeaderElectionService leaderElectionService,
+		FatalErrorHandler fatalErrorHandler) {
+		super(endpointConfiguration, leaderRetriever, clusterConfiguration, restConfiguration, resourceManagerRetriever, executor, metricQueryServiceRetriever, leaderElectionService, fatalErrorHandler);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index 2db1fce..59d3592 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -413,4 +413,11 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 	public void unhandledError(String message, Throwable e) {
 		leaderContender.handleError(new FlinkException("Unhandled error in ZooKeeperLeaderElectionService: " + message, e));
 	}
+
+	@Override
+	public String toString() {
+		return "ZooKeeperLeaderElectionService{" +
+			"leaderPath='" + leaderPath + '\'' +
+			'}';
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac8225fd/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
index b5ba4e9..a9c7e5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalListener.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.leaderretrieval;
 
+import javax.annotation.Nullable;
+
 import java.util.UUID;
 
 /**
@@ -32,7 +34,7 @@ public interface LeaderRetrievalListener {
 	 * @param leaderAddress The address of the new leader
 	 * @param leaderSessionID The new leader session ID
 	 */
-	void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID);
+	void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);
 
 	/**
 	 * This method is called by the {@link LeaderRetrievalService} in case of an exception. This