You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/10/20 09:59:18 UTC

[27/47] flink git commit: [FLINK-2793] [runtime-web] Rework JobManagerRetriever to avoid race conditions

[FLINK-2793] [runtime-web] Rework JobManagerRetriever to avoid race conditions

The JobManagerRetriever sets the new leaderGatewayPortFuture directly in the notifyLeaderAddress
method instead of in one of the futures. This avoids race conditions between multiple futures
which finish in a different order than they were started. Furthermore, this replaces promises
by futures where a promise is not needed.

Add logging statement

Fix WebRuntimeMonitorITCase to use random port and proper state backend

Add ChannelHandler.Sharable to RuntimeMonitorHandler

Remove sanity check from WebInfoServer to let it work on Yarn


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3ad9621
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3ad9621
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3ad9621

Branch: refs/heads/master
Commit: e3ad96211ebcab4317a7bb1ba42dfb1a9302aafd
Parents: 77fc0cc
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Oct 9 23:33:25 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 20 00:16:53 2015 +0200

----------------------------------------------------------------------
 flink-runtime-web/pom.xml                       |   8 ++
 .../runtime/webmonitor/JobManagerRetriever.java | 110 ++++++++++---------
 .../webmonitor/RuntimeMonitorHandler.java       |  54 +++++----
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  10 +-
 .../files/StaticFileServerHandler.java          |  63 ++++++-----
 .../handlers/HandlerRedirectUtils.java          |   2 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  23 +++-
 .../flink/runtime/jobmanager/JobManager.scala   |  21 ++--
 .../ExecutionGraphRestartTest.scala             |  16 ++-
 tools/log4j-travis.properties                   |   1 +
 10 files changed, 181 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 727604f..f2ac818 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -109,6 +109,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-runtime</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
index 7162639..93db280 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerRetriever.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -35,9 +36,11 @@ import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
+import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.UUID;
+import java.util.concurrent.TimeoutException;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -56,17 +59,14 @@ public class JobManagerRetriever implements LeaderRetrievalListener {
 
 	private static final Logger LOG = LoggerFactory.getLogger(JobManagerRetriever.class);
 
-	private final Object lock = new Object();
+	private final Object waitLock = new Object();
 
 	private final WebMonitor webMonitor;
 	private final ActorSystem actorSystem;
 	private final FiniteDuration lookupTimeout;
 	private final FiniteDuration timeout;
 
-	private volatile Tuple2<Promise<ActorGateway>, Promise<Integer>> leaderPromise =
-			new Tuple2<Promise<ActorGateway>, Promise<Integer>>(
-					new scala.concurrent.impl.Promise.DefaultPromise<ActorGateway>(),
-					new scala.concurrent.impl.Promise.DefaultPromise<Integer>());
+	private volatile Future<Tuple2<ActorGateway, Integer>> leaderGatewayPortFuture;
 
 	public JobManagerRetriever(
 			WebMonitor webMonitor,
@@ -81,22 +81,21 @@ public class JobManagerRetriever implements LeaderRetrievalListener {
 	}
 
 	/**
-	 * Returns the leading job manager gateway and its web monitor port.
+	 * Returns the currently known leading job manager gateway and its web monitor port.
 	 */
 	public Option<Tuple2<ActorGateway, Integer>> getJobManagerGatewayAndWebPort() throws Exception {
-		Tuple2<Promise<ActorGateway>, Promise<Integer>> promise = leaderPromise;
+		if (leaderGatewayPortFuture != null) {
+			Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = leaderGatewayPortFuture;
 
-		if (!promise._1().isCompleted() || !promise._1().isCompleted()) {
-			return Option.empty();
-		}
-		else {
-			Promise<ActorGateway> leaderGatewayPromise = promise._1();
-			Promise<Integer> leaderWebPortPromise = promise._2();
+			if (gatewayPortFuture.isCompleted()) {
+				Tuple2<ActorGateway, Integer> gatewayPort = Await.result(gatewayPortFuture, timeout);
 
-			ActorGateway leaderGateway = Await.result(leaderGatewayPromise.future(), timeout);
-			int leaderWebPort = Await.result(leaderWebPortPromise.future(), timeout);
-
-			return Option.apply(new Tuple2<>(leaderGateway, leaderWebPort));
+				return Option.apply(gatewayPort);
+			} else {
+				return Option.empty();
+			}
+		} else {
+			return Option.empty();
 		}
 	}
 
@@ -104,66 +103,73 @@ public class JobManagerRetriever implements LeaderRetrievalListener {
 	 * Awaits the leading job manager gateway and its web monitor port.
 	 */
 	public Tuple2<ActorGateway, Integer> awaitJobManagerGatewayAndWebPort() throws Exception {
-		Tuple2<Promise<ActorGateway>, Promise<Integer>> promise = leaderPromise;
+		Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = null;
+		Deadline deadline = timeout.fromNow();
 
-		Promise<ActorGateway> leaderGatewayPromise = promise._1();
-		Promise<Integer> leaderWebPortPromise = promise._2();
+		while(!deadline.isOverdue()) {
+			synchronized (waitLock) {
+				gatewayPortFuture = leaderGatewayPortFuture;
 
-		ActorGateway leaderGateway = Await.result(leaderGatewayPromise.future(), timeout);
-		int leaderWebPort = Await.result(leaderWebPortPromise.future(), timeout);
+				if (gatewayPortFuture != null) {
+					break;
+				}
 
-		return new Tuple2<>(leaderGateway, leaderWebPort);
+				waitLock.wait(deadline.timeLeft().toMillis());
+			}
+		}
+
+		if (gatewayPortFuture == null) {
+			throw new TimeoutException("There is no JobManager available.");
+		} else {
+			return Await.result(gatewayPortFuture, deadline.timeLeft());
+		}
 	}
 
 	@Override
 	public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
 		if (leaderAddress != null && !leaderAddress.equals("")) {
 			try {
-				final Promise<ActorGateway> gatewayPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
-				final Promise<Integer> webPortPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+				final Promise<Tuple2<ActorGateway, Integer>> leaderGatewayPortPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
 
-				final Tuple2<Promise<ActorGateway>, Promise<Integer>> newPromise = new Tuple2<>(
-						gatewayPromise, webPortPromise);
+				synchronized (waitLock) {
+					leaderGatewayPortFuture = leaderGatewayPortPromise.future();
+					waitLock.notifyAll();
+				}
 
-				LOG.info("Retrieved leader notification {}:{}.", leaderAddress, leaderSessionID);
+				LOG.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
 
 				AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, lookupTimeout)
 						// Resolve the actor ref
-						.flatMap(new Mapper<ActorRef, Future<Object>>() {
+						.flatMap(new Mapper<ActorRef, Future<Tuple2<ActorGateway, Object>>>() {
 							@Override
-							public Future<Object> apply(ActorRef jobManagerRef) {
+							public Future<Tuple2<ActorGateway, Object>> apply(ActorRef jobManagerRef) {
 								ActorGateway leaderGateway = new AkkaActorGateway(
 										jobManagerRef, leaderSessionID);
 
-								gatewayPromise.success(leaderGateway);
+								Future<Object> webMonitorPort = leaderGateway.ask(
+									JobManagerMessages.getRequestWebMonitorPort(),
+									timeout);
 
-								return leaderGateway.ask(JobManagerMessages
-										.getRequestWebMonitorPort(), timeout);
+								return Futures.successful(leaderGateway).zip(webMonitorPort);
 							}
 						}, actorSystem.dispatcher())
 								// Request the web monitor port
-						.onComplete(new OnComplete<Object>() {
+						.onComplete(new OnComplete<Tuple2<ActorGateway, Object>>() {
 							@Override
-							public void onComplete(Throwable failure, Object success) throws Throwable {
+							public void onComplete(Throwable failure, Tuple2<ActorGateway, Object> success) throws Throwable {
 								if (failure == null) {
-									int webMonitorPort = ((ResponseWebMonitorPort) success).port();
-									webPortPromise.success(webMonitorPort);
-
-									// Complete the promise
-									synchronized (lock) {
-										Tuple2<Promise<ActorGateway>, Promise<Integer>>
-												previousPromise = leaderPromise;
-
-										leaderPromise = newPromise;
-
-										if (!previousPromise._2().isCompleted()) {
-											previousPromise._1().completeWith(gatewayPromise.future());
-											previousPromise._2().completeWith(webPortPromise.future());
-										}
+									if (success._2() instanceof ResponseWebMonitorPort) {
+										int webMonitorPort = ((ResponseWebMonitorPort) success._2()).port();
+
+										leaderGatewayPortPromise.success(new Tuple2<>(success._1(), webMonitorPort));
+									} else {
+										leaderGatewayPortPromise.failure(new Exception("Received the message " +
+										success._2() + " as response to " + JobManagerMessages.getRequestWebMonitorPort() +
+											". But a message of type " + ResponseWebMonitorPort.class + " was expected."));
 									}
-								}
-								else {
-									LOG.warn("Failed to retrieve leader gateway and port.");
+								} else {
+									LOG.warn("Failed to retrieve leader gateway and port.", failure);
+									leaderGatewayPortPromise.failure(failure);
 								}
 							}
 						}, actorSystem.dispatcher());

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index b9369ea..e174463 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -35,7 +36,9 @@ import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.util.ExceptionUtils;
 import scala.Option;
 import scala.Tuple2;
-import scala.concurrent.Promise;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.nio.charset.Charset;
 
@@ -47,6 +50,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * This handler also deals with setting correct response MIME types and returning
  * proper codes, like OK, NOT_FOUND, or SERVER_ERROR.
  */
+@ChannelHandler.Sharable
 public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> {
 
 	private static final Charset ENCODING = Charset.forName("UTF-8");
@@ -55,7 +59,9 @@ public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> {
 
 	private final JobManagerRetriever retriever;
 
-	private final Promise<String> localJobManagerAddressPromise;
+	private final Future<String> localJobManagerAddressFuture;
+
+	private final FiniteDuration timeout;
 
 	private final String contentType;
 
@@ -64,35 +70,41 @@ public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> {
 	public RuntimeMonitorHandler(
 			RequestHandler handler,
 			JobManagerRetriever retriever,
-			Promise<String> localJobManagerAddressPromise) {
+			Future<String> localJobManagerAddressFuture,
+			FiniteDuration timeout) {
 
 		this.handler = checkNotNull(handler);
 		this.retriever = checkNotNull(retriever);
-		this.localJobManagerAddressPromise = checkNotNull(localJobManagerAddressPromise);
+		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
+		this.timeout = checkNotNull(timeout);
 		this.contentType = (handler instanceof RequestHandler.JsonResponse) ? "application/json" : "text/plain";
 	}
 
 	@Override
 	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
-		if (localJobManagerAddress == null) {
-			localJobManagerAddress = localJobManagerAddressPromise.future().value().get().get();
-		}
-
-		Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
-
-		if (jobManager.isDefined()) {
-			String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
-					localJobManagerAddress, jobManager.get());
-
-			if (redirectAddress != null) {
-				HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path());
-				KeepAliveWrite.flush(ctx, routed.request(), redirect);
+		if (localJobManagerAddressFuture.isCompleted()) {
+			if (localJobManagerAddress == null) {
+				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
 			}
-			else {
-				respondAsLeader(ctx, routed, jobManager.get()._1());
+
+			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
+
+			if (jobManager.isDefined()) {
+				Tuple2<ActorGateway, Integer> gatewayPort = jobManager.get();
+				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
+					localJobManagerAddress, gatewayPort);
+
+				if (redirectAddress != null) {
+					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path());
+					KeepAliveWrite.flush(ctx, routed.request(), redirect);
+				}
+				else {
+					respondAsLeader(ctx, routed, gatewayPort._1());
+				}
+			} else {
+				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
 			}
-		}
-		else {
+		} else {
 			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 40ab6c1..ec973c7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -115,6 +115,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private final Promise<String> jobManagerAddressPromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
 
+	private final FiniteDuration timeout;
+
 	private Channel serverChannel;
 
 	private final File webRootDir;
@@ -174,7 +176,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 			throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
 		}
 
-		FiniteDuration timeout = AkkaUtils.getTimeout(config);
+		timeout = AkkaUtils.getTimeout(config);
 		FiniteDuration lookupTimeout = AkkaUtils.getTimeout(config);
 
 		retriever = new JobManagerRetriever(this, actorSystem, lookupTimeout, timeout);
@@ -218,10 +220,10 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs)))
 
 			.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
-			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(retriever, DEFAULT_REQUEST_TIMEOUT)))
+			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
 
 			// this handler serves all the static contents
-			.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise, webRootDir));
+			.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir));
 
 		synchronized (startupShutdownLock) {
 
@@ -335,6 +337,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 	// ------------------------------------------------------------------------
 
 	private RuntimeMonitorHandler handler(RequestHandler handler) {
-		return new RuntimeMonitorHandler(handler, retriever, jobManagerAddressPromise);
+		return new RuntimeMonitorHandler(handler, retriever, jobManagerAddressPromise.future(), timeout);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index 944407e..d46a900 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -52,7 +52,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 import scala.Tuple2;
-import scala.concurrent.Promise;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -108,7 +110,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 	/** JobManager retriever */
 	private final JobManagerRetriever retriever;
 
-	private final Promise<String> localJobManagerAddressPromise;
+	private final Future<String> localJobManagerAddressFuture;
+
+	private final FiniteDuration timeout;
 
 	/** The path in which the static documents are */
 	private final File rootPath;
@@ -120,20 +124,23 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 
 	public StaticFileServerHandler(
 			JobManagerRetriever retriever,
-			Promise<String> localJobManagerAddressPromise,
+			Future<String> localJobManagerAddressPromise,
+			FiniteDuration timeout,
 			File rootPath) {
 
-		this(retriever, localJobManagerAddressPromise, rootPath, DEFAULT_LOGGER);
+		this(retriever, localJobManagerAddressPromise, timeout, rootPath, DEFAULT_LOGGER);
 	}
 
 	public StaticFileServerHandler(
 			JobManagerRetriever retriever,
-			Promise<String> localJobManagerAddressPromise,
+			Future<String> localJobManagerAddressFuture,
+			FiniteDuration timeout,
 			File rootPath,
 			Logger logger) {
 
 		this.retriever = checkNotNull(retriever);
-		this.localJobManagerAddressPromise = localJobManagerAddressPromise;
+		this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
+		this.timeout = checkNotNull(timeout);
 		this.rootPath = checkNotNull(rootPath);
 		this.logger = checkNotNull(logger);
 	}
@@ -144,41 +151,45 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 
 	@Override
 	public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
-		if (localJobManagerAddress == null) {
-			localJobManagerAddress = localJobManagerAddressPromise.future().value().get().get();
-		}
+		if (localJobManagerAddressFuture.isCompleted()) {
+			if (localJobManagerAddress == null) {
+				localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout);
+			}
 
-		final HttpRequest request = routed.request();
-		String requestPath = routed.path();
+			final HttpRequest request = routed.request();
+			String requestPath = routed.path();
 
-		// make sure we request the "index.html" in case there is a directory request
-		if (requestPath.endsWith("/")) {
-			requestPath = requestPath + "index.html";
-		}
+			// make sure we request the "index.html" in case there is a directory request
+			if (requestPath.endsWith("/")) {
+				requestPath = requestPath + "index.html";
+			}
 
 		// in case the files being accessed are logs or stdout files, find appropriate paths.
 		if (requestPath.equals("/jobmanager/log")) {
 			requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.LOG_FILE_PATTERN);
 		} else if (requestPath.equals("/jobmanager/stdout")) {
 			requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.STDOUT_FILE_PATTERN);
-		}
+			}
 
-		Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
+			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
 
-		if (jobManager.isDefined()) {
-			// Redirect to leader if necessary
-			String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
+			if (jobManager.isDefined()) {
+				// Redirect to leader if necessary
+				String redirectAddress = HandlerRedirectUtils.getRedirectAddress(
 					localJobManagerAddress, jobManager.get());
 
-			if (redirectAddress != null) {
-				HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, requestPath);
-				KeepAliveWrite.flush(ctx, routed.request(), redirect);
+				if (redirectAddress != null) {
+					HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, requestPath);
+					KeepAliveWrite.flush(ctx, routed.request(), redirect);
+				}
+				else {
+					respondAsLeader(ctx, request, requestPath);
+				}
 			}
 			else {
-				respondAsLeader(ctx, request, requestPath);
+				KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
 			}
-		}
-		else {
+		} else {
 			KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index 887c46e..800c7c0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -67,7 +67,7 @@ public class HandlerRedirectUtils {
 				return redirectAddress;
 			}
 			else {
-				LOG.warn("Unexpected leader address pattern. Cannot extract host.");
+				LOG.warn("Unexpected leader address pattern {}. Cannot extract host.", leaderAddress);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 26f66b0..5167d13 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -27,13 +27,18 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.powermock.reflect.Whitebox;
 import scala.Some;
 import scala.Tuple2;
@@ -50,7 +55,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
-public class WebRuntimeMonitorITCase {
+public class WebRuntimeMonitorITCase extends TestLogger {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	private final static FiniteDuration TestTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
 
@@ -77,6 +85,7 @@ public class WebRuntimeMonitorITCase {
 			Configuration monitorConfig = new Configuration();
 			monitorConfig.setString(WebMonitorConfig.JOB_MANAGER_WEB_DOC_ROOT_KEY, MAIN_RESOURCES_PATH);
 			monitorConfig.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
+			monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
 
 			// Needs to match the leader address from the leader retrieval service
 			String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
@@ -131,12 +140,12 @@ public class WebRuntimeMonitorITCase {
 		List<LeaderRetrievalService> leaderRetrievalServices = new ArrayList<>();
 
 		try (TestingServer zooKeeper = new TestingServer()) {
-			final Configuration config = new Configuration();
+			final Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+				zooKeeper.getConnectString(),
+				temporaryFolder.getRoot().getPath());
 			config.setString(WebMonitorConfig.JOB_MANAGER_WEB_DOC_ROOT_KEY, MAIN_RESOURCES_PATH);
 			config.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
 			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-			config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
-			config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString());
 
 			for (int i = 0; i < jobManagerSystem.length; i++) {
 				jobManagerSystem[i] = AkkaUtils.createActorSystem(new Configuration(),
@@ -157,7 +166,11 @@ public class WebRuntimeMonitorITCase {
 						webMonitor[i].getServerPort());
 
 				jobManager[i] = JobManager.startJobManagerActors(
-						jmConfig, jobManagerSystem[i], StreamingMode.STREAMING)._1();
+					jmConfig,
+					jobManagerSystem[i],
+					StreamingMode.STREAMING,
+					JobManager.class,
+					MemoryArchivist.class)._1();
 
 				jobManagerAddress[i] = AkkaUtils.getAkkaURL(jobManagerSystem[i], jobManager[i]);
 				webMonitor[i].start(jobManagerAddress[i]);

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 161e8de..ebc0ea9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1624,7 +1624,6 @@ object JobManager {
         monitor =>
           val jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(configuration)
           monitor.start(jobManagerAkkaUrl)
-
         LOG.info("Starting JobManger web frontend")
         // start the web frontend. we need to load this dynamically
         // because it is not in the same project/dependencies
@@ -1632,11 +1631,9 @@ object JobManager {
           configuration,
           leaderRetrievalService,
           jobManagerSystem)
-        Option(webServer)
-      } else {
-        None
       }
 
+
       (jobManagerSystem, jobManager, archive, webMonitor)
     }
     catch {
@@ -1930,14 +1927,14 @@ object JobManager {
    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(
-                             configuration: Configuration,
-                             actorSystem: ActorSystem,
-                             jobMangerActorName: Option[String],
-                             archiveActorName: Option[String],
-                             streamingMode: StreamingMode,
-                             jobManagerClass: Class[_ <: JobManager],
-                             archiveClass: Class[_ <: MemoryArchivist])
-  : (ActorRef, ActorRef) = {
+      configuration: Configuration,
+      actorSystem: ActorSystem,
+      jobMangerActorName: Option[String],
+      archiveActorName: Option[String],
+      streamingMode: StreamingMode,
+      jobManagerClass: Class[_ <: JobManager],
+      archiveClass: Class[_ <: MemoryArchivist])
+    : (ActorRef, ActorRef) = {
 
     val (executionContext,
     instanceManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index 9a1cde0..e41d7ff 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -144,15 +144,19 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
         // Wait for deploying after async restart
         deadline = timeout.fromNow
         while (deadline.hasTimeLeft() && eg.getAllExecutionVertices.asScala.exists(
-          _.getCurrentExecutionAttempt.getState != ExecutionState.DEPLOYING)) {
+          _.getCurrentExecutionAttempt.getAssignedResource == null)) {
           Thread.sleep(100)
         }
-        
-        for (vertex <- eg.getAllExecutionVertices.asScala) {
-          vertex.getCurrentExecutionAttempt().markFinished()
-        }
 
-        eg.getState() should equal(JobStatus.FINISHED)
+        if (deadline.hasTimeLeft()) {
+          for (vertex <- eg.getAllExecutionVertices.asScala) {
+            vertex.getCurrentExecutionAttempt().markFinished()
+          }
+
+          eg.getState() should equal(JobStatus.FINISHED)
+        } else {
+          fail("Failed to wait until all execution attempts left the state DEPLOYING.")
+        }
       } catch {
         case t: Throwable =>
           t.printStackTrace()

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ad9621/tools/log4j-travis.properties
----------------------------------------------------------------------
diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties
index 53379b4..d55209e 100644
--- a/tools/log4j-travis.properties
+++ b/tools/log4j-travis.properties
@@ -40,6 +40,7 @@ log4j.logger.org.apache.zookeeper=ERROR
 log4j.logger.org.apache.zookeeper.server.quorum.QuorumCnxManager=OFF
 log4j.logger.org.apache.flink.runtime.leaderelection=DEBUG
 log4j.logger.org.apache.flink.runtime.leaderretrieval=DEBUG
+log4j.logger.org.apache.flink.runtime.executiongraph=DEBUG
 
 # Log a bit when running the flink-yarn-tests to avoid running into the 5 minutes timeout for
 # the tests